【Celery实践二】在Flask项目中使用Celery

2022-01-11 10:42:27 浏览数 (1)

背景

上篇我们介绍了Celery的环境搭建以及基础入门,这篇主要分享如何在Python Flask项目中使用。

步骤

1、新建flask项目,目录结构如下

Common目录下存放model层做数据库关系映射以及公共方法

Config目录下存放项目配置以及celery配置

Controllers目录下存放业务控制方法以及注册路由

Tasks目录下存放异步任务方法

具体代码如下:

  • Celery_settings.py
代码语言:javascript复制
# celery配置CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区CELERY_ENABLE_UTC = False  # 禁用UTC,配合CELERY_TIMEZONE使用BROKER_URL = "amqp://yyyyy:xxxxxxxxxx@192.168.a.bb:5672/"  # broker地址CELERY_RESULT_BACKEND = "yyyyy://:xxxxxxxxxx@192.168.3.53:6379/0"  # result地址CELERY_ROUTES = {    'run_api_job_delay': {'queue':'job1'},    'run_ui_job_delay': {'queue':'job2'},}# 不同任务队列配置
  • Settings.py
代码语言:javascript复制
#公用配置DEBUG = TrueSQLALCHEMY_ECHO = FalseDB_HOST="192.168.a.bb"DB_USER="root"DB_PASSWORD="xxxxxxxxxx"SQLALCHEMY_DATABASE_URI="mysql pymysql://" DB_USER ":" DB_PASSWORD "@" DB_HOST ":3306/rpa"SQLALCHEMY_TRACK_MODIFICATIONS = TrueSECRET_KEY = "xxxxxxxxx"CORS_ALLOW_CREDENTIALS = TrueCORS_ORIGIN_ALLOW_ALL = TrueCSRF_ENABLED = True
  • Run_job.py
代码语言:javascript复制
from flask importBlueprintfrom flask import jsonifyfrom flask_restful import reqparsefrom tasks.tasks import run_job_delay
runJob_page = Blueprint("runJob_page", __name__)
# 执行/调试场景测试@runJob_page.route('/run_job', methods=['POST'])#指定路由def run_job():    parser = reqparse.RequestParser()    parser.add_argument('job_id',type=int)    args = parser.parse_args()    job_id = args.get('job_id')    _save_run(job_id)    return jsonify({'msg':"ok", "remark": "任务开始执行"})    def _save_run(job_id):    run_job_delay.delay(job_id)
  • Tasks.py
代码语言:javascript复制
from application importceleryfrom celery.utils.log import get_task_logger
logger = get_task_logger(__name__)#日志输出@celery.task(name='run_api_job_delay')def run_api_job_delay(job_id):    print('执行异步任务')
  • Application.py
代码语言:javascript复制
from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_cors import *from celery import Celeryfrom config import celery_settingfrom flask_httpauth import HTTPBasicAuth
app = Flask(__name__)#实例化应用对象celery = Celery(app.name)# 创建celery实例celery.config_from_object(celery_setting)#读取celery配置CORS(app, supports_credentials=True)app.config.from_pyfile("config\settings.py")db = SQLAlchemy(app)auth = HTTPBasicAuth()
  • manager.py
代码语言:javascript复制
from application importapp, managerfrom flask_script import Commandfrom www import *from gevent import pywsgi
# create_table@Commanddef create_all():    from application import db    db.create_all()    manager.add_command("create_all", create_all)
if __name__ == "__main__":    # 测试    app.run(host='0.0.0.0', debug=True,threaded=True, port=8888)    # 生产    # server =pywsgi.WSGIServer(('192.168.a.bb', 5000), app)    # server.serve_forever()

2、创建worker项目

配置项和server项目相同

Controllers/runJob.py

代码语言:javascript复制
celery =Celery('worker', broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND)#实例化对象
@celery.task(name='run_job_delay')def run_job_delay(job_id):   Run_job(job_Id)

3、启动server项目

代码语言:javascript复制
python manager.py

4、启动worker项目

Q参数可以指定监听队列

代码语言:javascript复制
celery worker -A worker -l info -P eventlet -Q job1 

5、工作流简述

    请求run_job接口,通过url映射到对应view函数;view函数执行业务处理后推送异步方法到指定队列;worker监听指定队列中消息并消费,将结果保存;

    如果平台是综合多种类型的自动化任务并且需要指定worker消费的话,流转应该是下图这样。

    例如worker1部署接口自动化执行服务,worker2部署UI自动化执行服务。

最后

整体来讲Celery使用上手难度 ★★☆☆☆,容易出问题的地方一般在启动时:worker 以及 -A 后边路径,下篇分享如何使用Celery实现动态定时任务的配置。

0 人点赞