@nemos
2017-05-05T15:01:42.000000Z
字数 1273
阅读 943
py pkg
分布式队列,用于执行异步任务
# tasks.pyimport timefrom celery import Celerybroker = 'redis://127.0.0.1:6379' # 消息中间用于消息队列backend = 'redis://127.0.0.1:6379/0' # 结果存储# 创建celery实例app = Celery('my_task', broker=broker, backend=backend)@app.task # 声明需要执行的任务def add(x, y):time.sleep(5) # 模拟耗时操作return x + y
celery worker -A tasks --loglevel=info 启动实例
from tasks import addresult = add.delay(2, 6) # 异步执行若任务,不会阻塞result.ready() # 使用 ready() 判断任务是否执行完毕Falseresult.get() # 使用 get() 获取任务结果8

应用目录
celery_demo # 项目根目录├── celery_app # 存放 celery 相关文件│ ├── __init__.py│ ├── celeryconfig.py # 配置文件│ └── task.py # 任务文件└── client.py # 应用程序
# __init__.pyfrom celery import Celeryapp = Celery('demo') # 创建 Celery 实例app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
# celeryconfig.pyBROKER_URL = 'redis://127.0.0.1:6379' # 指定 BrokerCELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 BackendCELERY_TIMEZONE='Asia/Shanghai' # 指定时CELERY_IMPORTS = ( # 指定导入的任务模块'celery_app.task',)# 用于定时任务CELERYBEAT_SCHEDULE = {'add-every-30-seconds': {'task': 'celery_app.task.add','schedule': timedelta(seconds=30), # 每 30 秒执行一次'args': (5, 8) # 任务函数参数}}
# task.pyimport timefrom celery_app import app@app.taskdef add(x, y):time.sleep(2)return x + y
# client.pyfrom celery_app import tasktask1.add.apply_async(args=[2, 8])# 也可用 task.add.delay(2, 8)
$ celery -B -A celery_app worker --loglevel=info 定时任务启动指定-B参数