@delight
2015-01-10T06:27:15.000000Z
字数 4065
阅读 3481
python celery redis
install:
pip install celery
选择broker,安装,这里假设使用Redis:
apt-get install redis-server
首先认真阅读官方celery文档的get start部分,如果有时间的话,最好全部看一边…
然后参考阅读别人的best practices,基本就可以干活了。
platforms.C_FORCE_ROOT=True,但是最好别用root;pickle的警告,设置CELERY_ACCEPT_CONTENT=['pickle',];BROKER_HEARTBEAT=10,来消除心跳相关警告;router是不支持通配符的,如果需要,可以自己写一个自定义Router类。下面是一个celery.py的例子:
from __future__ import absolute_importfrom celery import Celery, platformsfrom settings import CELERY_BROKERfrom kombu import Queue, Exchangeclass MyRouter(object):'''router for tasks using wildcard'''def route_for_task(self, task, *args, **kwargs):if task.startswith('writer'):return {'queue': 'async_writer', 'routing_key': 'async_writer'}elif task.startswith('caller'):return {'queue': 'async_caller', 'routing_key': 'async_caller'}else:return {'queue': 'default', 'routing_key': 'default'}QUEUES = (Queue('default', Exchange('default'), routing_key='default'),Queue('async_writer', Exchange('async_writer'),routing_key='async_writer'),Queue('async_caller', Exchange('async_caller'),routing_key='async_caller'),)platforms.C_FORCE_ROOT = Trueapp = Celery('async',broker=CELERY_BROKER,include=['async.writer', 'async.caller', 'async.checker', ])app.conf.update(CELERY_ACCEPT_CONTENT=['pickle', ],CELERY_IGNORE_RESULT=True,CELERY_DISABLE_RATE_LIMITS=True,CELERY_DEFAULT_EXCHANGE='default',CELERY_DEFAULT_QUEUE='default',CELERY_DEFAULT_ROUTING_KEY='default',CELERY_DEFAULT_EXCHANGE_TYPE='topic',CELERY_TASK_SERIALIZER='pickle',CELERY_RESULT_SERIALIZER='pickle',BROKER_HEARTBEAT=10,CELERY_QUEUES=QUEUES,CELERY_ROUTES=(MyRouter(),),)if __name__ == "__main__":app.start()
官方给出的init.d脚本不是很好用,下面是一个自己写的参考:
#!/bin/bash## PushserverCore uWSGI Web Server init script#### BEGIN INIT INFO# Provides: PushserverCore# Required-Start: $remote_fs $remote_fs $network $syslog# Required-Stop: $remote_fs $remote_fs $network $syslog# Default-Start: 2 3 4 5# Default-Stop: 0 1 6# Short-Description: Start PushserverCore Service for generic init daemon# Description: PushserverCore Service thrift Server backend.### END INIT INFONAME="Core Thrift Server"PROJECT=PushserverCorePATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/var/app/enabled/$PROJECTDESC="PushserverCore"APP_DIR=/var/app/enabled/$PROJECT/CoreAPP_PATH=$APP_DIR/CoreServer.pyCELERY_LOG_PATH=/var/app/log/PushserverCore/celery.logprint_succ(){echo "$(tput setaf 2)$(tput bold)DONE$(tput sgr0)"}print_fail(){echo "$(tput setaf 1)$(tput bold)FAILED$(tput sgr0)"}stop_service(){echo "stoping $NAME..."if pgrep -f $APP_PATH > /dev/null 2>&1; thenpkill -f $APP_PATHfiprint_succ}start_service(){if pgrep -f $APP_PATH > /dev/null 2>&1; thenecho "$NAME service is already running."returnelseecho "starting $NAME service..."nohup python $APP_PATH >/dev/null 2>&1 &fisleep 3if pgrep -f $APP_PATH > /dev/null 2>&1; thenprint_succelseprint_failfi}stop_worker(){echo "stoping celery worker..."if pgrep -f celery > /dev/null 2>&1;thenpkill -f celeryfiprint_succ}start_worker(){if pgrep -f celery > /dev/null 2>&1; thenecho "celery is already running"returnelseecho "starting celery worker..."celery -A async multi start writer caller default -Q:writer async_writer -Q:caller async_caller -Q:default default \-c 7 -l INFO --workdir=$APP_DIR --logfile=$CELERY_LOG_PATHfisleep 3if pgrep -f celery > /dev/null 2>&1; thenprint_succelseprint_failfi}check_status(){if pgrep -f $APP_PATH > /dev/null 2>&1; thenecho "$NAME is running"elseecho "$NAME is not running"fiif pgrep -f celery > /dev/null 2>&1; thenecho "celery worker is running"elseecho "celery worker is not running"fi}set -e. /lib/lsb/init-functionscase "$1" instart)echo "Starting $DESC..."start_servicestart_worker;;stop)echo "Stopping $DESC..."stop_servicestop_worker;;restart)echo "Restarting $DESC..."stop_servicestop_workersleep 3start_servicestart_workerecho "Checking..."check_status;;status)check_status;;*)echo "Usage: $NAME {start|stop|restart|status}" >&2exit 1;;esacexit 0
重点需要关注的是celery multi start的用法,注意start后面跟的是worker的名字(取数据的worker),也可以简单的写3,然后-Q:worker_name queue_name,最后-c是实际的worker(干活的worker)的数目,-Q是给队列指定worker。例子中的语句,意思是启动3个worker,分别命名为writer, caller和default,然后启动3个队列,名字分别是async_writer, async_caller和default,每个worker分配7个进程用来干活。