@nemos
2017-05-05T15:01:42.000000Z
字数 1273
阅读 913
py
pkg
分布式队列,用于执行异步任务
# tasks.py
import time
from celery import Celery
broker = 'redis://127.0.0.1:6379' # 消息中间用于消息队列
backend = 'redis://127.0.0.1:6379/0' # 结果存储
# 创建celery实例
app = Celery('my_task', broker=broker, backend=backend)
@app.task # 声明需要执行的任务
def add(x, y):
time.sleep(5) # 模拟耗时操作
return x + y
celery worker -A tasks --loglevel=info
启动实例
from tasks import add
result = add.delay(2, 6) # 异步执行若任务,不会阻塞
result.ready() # 使用 ready() 判断任务是否执行完毕
False
result.get() # 使用 get() 获取任务结果
8
应用目录
celery_demo # 项目根目录
├── celery_app # 存放 celery 相关文件
│ ├── __init__.py
│ ├── celeryconfig.py # 配置文件
│ └── task.py # 任务文件
└── client.py # 应用程序
# __init__.py
from celery import Celery
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
# celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai' # 指定时
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task',
)
# 用于定时任务
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task.add',
'schedule': timedelta(seconds=30), # 每 30 秒执行一次
'args': (5, 8) # 任务函数参数
}
}
# task.py
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
# client.py
from celery_app import task
task1.add.apply_async(args=[2, 8])
# 也可用 task.add.delay(2, 8)
$ celery -B -A celery_app worker --loglevel=info
定时任务启动指定-B参数