[关闭]
@nemos 2017-05-05T15:01:42.000000Z 字数 1273 阅读 913

celery

py pkg


分布式队列,用于执行异步任务

快速使用

  1. # tasks.py
  2. import time
  3. from celery import Celery
  4. broker = 'redis://127.0.0.1:6379' # 消息中间用于消息队列
  5. backend = 'redis://127.0.0.1:6379/0' # 结果存储
  6. # 创建celery实例
  7. app = Celery('my_task', broker=broker, backend=backend)
  8. @app.task # 声明需要执行的任务
  9. def add(x, y):
  10. time.sleep(5) # 模拟耗时操作
  11. return x + y

celery worker -A tasks --loglevel=info 启动实例

  1. from tasks import add
  2. result = add.delay(2, 6) # 异步执行若任务,不会阻塞
  3. result.ready() # 使用 ready() 判断任务是否执行完毕
  4. False
  5. result.get() # 使用 get() 获取任务结果
  6. 8

详细说明

celery

应用目录

  1. celery_demo # 项目根目录
  2. ├── celery_app # 存放 celery 相关文件
  3. ├── __init__.py
  4. ├── celeryconfig.py # 配置文件
  5. └── task.py # 任务文件
  6. └── client.py # 应用程序
  1. # __init__.py
  2. from celery import Celery
  3. app = Celery('demo') # 创建 Celery 实例
  4. app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
  1. # celeryconfig.py
  2. BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
  3. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
  4. CELERY_TIMEZONE='Asia/Shanghai' # 指定时
  5. CELERY_IMPORTS = ( # 指定导入的任务模块
  6. 'celery_app.task',
  7. )
  8. # 用于定时任务
  9. CELERYBEAT_SCHEDULE = {
  10. 'add-every-30-seconds': {
  11. 'task': 'celery_app.task.add',
  12. 'schedule': timedelta(seconds=30), # 每 30 秒执行一次
  13. 'args': (5, 8) # 任务函数参数
  14. }
  15. }
  1. # task.py
  2. import time
  3. from celery_app import app
  4. @app.task
  5. def add(x, y):
  6. time.sleep(2)
  7. return x + y
  1. # client.py
  2. from celery_app import task
  3. task1.add.apply_async(args=[2, 8])
  4. # 也可用 task.add.delay(2, 8)

$ celery -B -A celery_app worker --loglevel=info 定时任务启动指定-B参数

参考

异步任务神器 Celery 简明笔记
官方文档


添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注