之前在用celery的时候,通常都是把任务直接丢进broker里异步处理的。今天网上看到篇文章说应该根据任务分类的不同分别使用不同的队列queue来处理。譬如说两种不同的任务,一个量多但不关键,一个量少但较为重要,如果放在同一个queue中,那么或多或少会带来相互之间的影响。所以应该将任务一放到queue1,而将另外的一个任务放在queue2中。这种方式我感觉很有道理诶。。。
然后查了下文档,用官方文档上的例子学习并记录下来。
目录结构如下
proj ├── celery.py ├── celery.pyc ├── __init__.py ├── __init__.pyc ├── tasks.py └── tasks.pyc 0 directories, 6 files
主要的celery.py代码如下
#!/usr/bin/env python #-*- encoding: utf-8 -*- #celery.py from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='redis://192.168.188.129:9379/0', #broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, task_routes = { 'proj.tasks.add': {'queue': 'addtask'}, 'proj.tasks.mul': {'queue': 'multask'}, }, ) if __name__ == '__main__': app.start()
tasks.py的代码如下
#!/usr/bin/env python #-*- encoding: utf-8 -*- #tasks.py from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
然后我们分别起两个worker,一个对应队列addtask,另外一个对应队列multask
[hu@server01 celery]$ pwd /home/hu/celery [hu@server01 celery]$ celery -A proj worker --loglevel=info -Q addtask [hu@server01 celery]$ celery -A proj worker --loglevel=info -Q multask
然后试着用下
>>> from proj import tasks >>> tasks.add.apply_async((4,49),queue='addtask') <AsyncResult: 02793a5e-c1f6-41cd-8160-f20a5f7b652c> >>> tasks.mul.apply_async((2,7),queue='addtask') <AsyncResult: 94ef93e2-b2b1-4351-8d4e-9bb8b1e40092> >>> tasks.mul.apply_async((2,7),queue='multask') <AsyncResult: b0cdf415-403d-4ac8-9391-1c4ccae55708>
这样就能够在前面起的两个worker打印的日志上分别看到他们是由指定的不同queue上取任务来工作的。
#queue为 addtask的worker [2017-04-07 15:46:57,760: INFO/MainProcess] Received task: proj.tasks.add[02793a5e-c1f6-41cd-8160-f20a5f7b652c] [2017-04-07 15:46:57,808: INFO/PoolWorker-1] Task proj.tasks.add[02793a5e-c1f6-41cd-8160-f20a5f7b652c] succeeded in 0.00718647900067s: 53 [2017-04-07 15:47:17,840: INFO/MainProcess] Received task: proj.tasks.mul[94ef93e2-b2b1-4351-8d4e-9bb8b1e40092] [2017-04-07 15:47:17,911: INFO/PoolWorker-1] Task proj.tasks.mul[94ef93e2-b2b1-4351-8d4e-9bb8b1e40092] succeeded in 0.0686908099997s: 14 #queue为 multask的worker [2017-04-07 15:47:26,959: INFO/MainProcess] Received task: proj.tasks.mul[b0cdf415-403d-4ac8-9391-1c4ccae55708] [2017-04-07 15:47:26,997: INFO/PoolWorker-1] Task proj.tasks.mul[b0cdf415-403d-4ac8-9391-1c4ccae55708] succeeded in 0.0344336039998s: 14
Cloudhu 个人随笔|built by django|
沪ICP备16019452号-1