背景
上篇我们介绍了Celery的环境搭建以及基础入门,这篇主要分享如何在Python Flask项目中使用。
步骤
1、新建flask项目,目录结构如下
Common目录下存放model层做数据库关系映射以及公共方法
Config目录下存放项目配置以及celery配置
Controllers目录下存放业务控制方法以及注册路由
Tasks目录下存放异步任务方法
具体代码如下:
- Celery_settings.py
# celery配置CELERY_TIMEZONE = 'Asia/Shanghai' # 时区CELERY_ENABLE_UTC = False # 禁用UTC,配合CELERY_TIMEZONE使用BROKER_URL = "amqp://yyyyy:xxxxxxxxxx@192.168.a.bb:5672/" # broker地址CELERY_RESULT_BACKEND = "yyyyy://:xxxxxxxxxx@192.168.3.53:6379/0" # result地址CELERY_ROUTES = { 'run_api_job_delay': {'queue':'job1'}, 'run_ui_job_delay': {'queue':'job2'},}# 不同任务队列配置
- Settings.py
#公用配置DEBUG = TrueSQLALCHEMY_ECHO = FalseDB_HOST="192.168.a.bb"DB_USER="root"DB_PASSWORD="xxxxxxxxxx"SQLALCHEMY_DATABASE_URI="mysql pymysql://" DB_USER ":" DB_PASSWORD "@" DB_HOST ":3306/rpa"SQLALCHEMY_TRACK_MODIFICATIONS = TrueSECRET_KEY = "xxxxxxxxx"CORS_ALLOW_CREDENTIALS = TrueCORS_ORIGIN_ALLOW_ALL = TrueCSRF_ENABLED = True
- Run_job.py
from flask importBlueprintfrom flask import jsonifyfrom flask_restful import reqparsefrom tasks.tasks import run_job_delay
runJob_page = Blueprint("runJob_page", __name__)
# 执行/调试场景测试@runJob_page.route('/run_job', methods=['POST'])#指定路由def run_job(): parser = reqparse.RequestParser() parser.add_argument('job_id',type=int) args = parser.parse_args() job_id = args.get('job_id') _save_run(job_id) return jsonify({'msg':"ok", "remark": "任务开始执行"}) def _save_run(job_id): run_job_delay.delay(job_id)
- Tasks.py
from application importceleryfrom celery.utils.log import get_task_logger
logger = get_task_logger(__name__)#日志输出@celery.task(name='run_api_job_delay')def run_api_job_delay(job_id): print('执行异步任务')
- Application.py
from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_cors import *from celery import Celeryfrom config import celery_settingfrom flask_httpauth import HTTPBasicAuth
app = Flask(__name__)#实例化应用对象celery = Celery(app.name)# 创建celery实例celery.config_from_object(celery_setting)#读取celery配置CORS(app, supports_credentials=True)app.config.from_pyfile("config\settings.py")db = SQLAlchemy(app)auth = HTTPBasicAuth()
- manager.py
from application importapp, managerfrom flask_script import Commandfrom www import *from gevent import pywsgi
# create_table@Commanddef create_all(): from application import db db.create_all() manager.add_command("create_all", create_all)
if __name__ == "__main__": # 测试 app.run(host='0.0.0.0', debug=True,threaded=True, port=8888) # 生产 # server =pywsgi.WSGIServer(('192.168.a.bb', 5000), app) # server.serve_forever()
2、创建worker项目
配置项和server项目相同
Controllers/runJob.py:
代码语言:javascript复制celery =Celery('worker', broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND)#实例化对象
@celery.task(name='run_job_delay')def run_job_delay(job_id): Run_job(job_Id)
3、启动server项目
代码语言:javascript复制python manager.py
4、启动worker项目
Q参数可以指定监听队列
代码语言:javascript复制celery worker -A worker -l info -P eventlet -Q job1
5、工作流简述
请求run_job接口,通过url映射到对应view函数;view函数执行业务处理后推送异步方法到指定队列;worker监听指定队列中消息并消费,将结果保存;
如果平台是综合多种类型的自动化任务并且需要指定worker消费的话,流转应该是下图这样。
例如worker1部署接口自动化执行服务,worker2部署UI自动化执行服务。
最后
整体来讲Celery使用上手难度 ★★☆☆☆,容易出问题的地方一般在启动时:worker 以及 -A 后边路径,下篇分享如何使用Celery实现动态定时任务的配置。