01
任务类别简介
任务进程为后台作业提供了一个便捷的解决方案。Worker过程独立于应用程序运行,甚至可以位于不同的系统上。应用程序和worker之间的通信是通过消息完成的。通过与物理相互作用来监视其进度。下图展示了一个典型的实现:
Python中最流行的任务类别是Celery。这是一个相当复杂的重叠,它有很多选项并支持多个消息示例。另一个流行的Python任务位置是Redis Queue(RQ),它牺牲了一些替代,,仅支持Redis消息本身,但作为交换,它的建立要比Celery简单长度
Celery和RQ都非常适合在Flask应用程序中支持后台任务,所以我可以选择更简单的RQ。不过,用Celery实现相同的功能其实也不难。如果您对Celery更有吸引力,可以阅读我的博客中的将Celery与Flask文章一起使用
02
使用RQ
RQ是一个标准的Python三方重叠,用pip
安装:
(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt
正如我前面提到的,应用和RQ worker之间的通信将在Redis消息中执行,因此你需要运行Redis服务器。有很多途径来安装和运行Redis服务器,可以下载其内核并执行编译和安装。如果你使用的是Windows中,微软在此处维护了Redis的的安装程序。在Linux的上,你可以通过操作系统的软件包管理器安装Redis的。Mac OS X的用户可以运行brew install redis
,使用然后redis-server
命令手动启动服务
除了确保服务正在运行并可以识别RQ访问之外,你不需要与Redis进行其他交互
03
创建任务
一个任务,不过是一个Python函数而已。以下是一个示例任务,我将其引入一个新的app / tasks.py模块:
app / tasks.py:示例后台任务
代码语言:javascript复制import time
def example(seconds):
print('Starting task')
for i in range(seconds):
print(i)
time.sleep(1)
print('Task completed')
该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器
04
运行 RQ 工人
任务准备就绪,可以通过rq worker
来启动一个worker进程了:
(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...
microblog-tasks
如果您想启动多个worker来扩展量子,您只需要运行rq worker
来生成更多连接到同一个模型的进程,就可以使用Worker进程现在连接到了Redis,并在称为的上面上查看可能的分配给它的任何作业。在生产环境中,您可能希望至少运行可用的CPU数量的工人。。然后,,当作业出现在特定位置时,任何可用的worker进程都可以获取它
05
执行任务
现在打开第二个终端窗口并激活虚拟环境。我将使用shell会话来启动worker中的example()
任务:
>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'
如果采用的是Redis服务器运行在不同的主机或端口号上,则使用RQ的Queue
类表示从应用程序端看到的任务类型。Redis
则需要使用其他URL。
队列的enqueue()
方法用于将作业添加到队列中。第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。我发现传入字符串更加方便,因为不需要在应用程序对enqueue()
预期的任何剩余参数将被传递给worker中运行的函数。
enqueue()
只要进行了调用,运行着RQ worker的终端窗口上就会出现一些活动。你会看到example()
函数正在运行,并且连续打印一次计数器。同时,你的其他终端不会被分开,你可以继续在shell在上面的示例中,我调用job.get_id()
方法来获取分配给任务的唯一标识符。你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:
>>> job.is_finished
False
如果你像我在上面的示例中那样传递了23
,那么函数将运行约23秒。在那之后,job.is_finished
表达式将True
转化为。就是这么简单,炫酷否?
一旦函数完成,worker又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行enqueue()
调用。),但最终会被删除。这很重要,任务类别不保留已执行作业的历史记录
06
报告任务进度
通常,对于长期运行的任务,您需要将一些进度信息提供给应用程序,从而可以将其显示给用户。RQ通过使用作业对象的meta
属性来支持这一点。让我重新编写example()
任务来编写进度报告:
app / tasks.py::带进度的示例后台任务
代码语言:javascript复制import time
from rq import get_current_job
def example(seconds):
job = get_current_job()
print('Starting task')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed')
这个新版本的example()
使用RQ的get_current_job()
函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。作业对象的meta
属性是一个字典,任务可以编写任何想要的与应用程序通信的自定义数据。在此示例中,我编写了progress
,表示完成任务的百分比。每次进程更新时,我都调用job.save_meta()
指示RQ将数据写入Redis,应用程序可以在其中找到它。
在应用程序方面(目前只是一个Python shell),我可以运行此任务,然后监视进度,如下所示:
代码语言:javascript复制>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True
如您所见,在另一侧,meta
属性可以被重新读。需要调用refresh()
方法来从Redis更新内容
07
任务的数据库表示
对于Web应用程序,情况会变得更复杂一些,因为一旦任务传递请求的处理而启动,该请求随即结束,而该任务因为我希望应用程序跟踪每个用户正在运行的任务,所以我需要使用数据库表来维护状态。你可以在下面看到新的Task
模型实现:
app / models.py:任务模型
代码语言:javascript复制# ...
import redis
import rq
class User(UserMixin, db.Model):
# ...
tasks = db.relationship('Task', backref='user', lazy='dynamic')
# ...
class Task(db.Model):
id = db.Column(db.String(36), primary_key=True)
name = db.Column(db.String(128), index=True)
description = db.Column(db.String(128))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
complete = db.Column(db.Boolean, default=False)
def get_rq_job(self):
try:
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
return None
return rq_job
def get_progress(self):
job = self.get_rq_job()
return job.meta.get('progress', 0) if job is not None else 100
模型这个状语从句:以前的模型有一个有趣的区别的英文id
主键字段的英文字符串类型,而不是整数类型。这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。
该模型将存储符合任务命名规范的名称(会传递给RQ),适用于向用户显示的任务描述,该任务的所属用户的关系以及任务是否已完成的布尔值。complete
字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。
get_rq_job()
辅助方法可以用给定的任务ID加载RQ Job
实例。的英文这通过Job.fetch()
完成的,它会从Redis的存在中的数据中加载Job
实例。get_progress()
方法建立在get_rq_job()
的基础之上,并返回任务的进度百分比。该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ变量中,则表示作业已完成和数据已过期并已从该中删除,因此在这种情况下返回的百分比为100。同时,如果job存在,但'meta'属性中找到进度相关的信息,那么可以安全地进行该作业计划运行,但还没有启动,所以在这种情况下进度是0。
改进更改数据库,需要生成新的迁移,然后升级数据库:
代码语言:javascript复制(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade
新模型也可以添加到shell一部分中,盔甲在shell会话中访问它时无需导入:
microblog.py:添加任务模型到shell上下文中
代码语言:javascript复制from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task
app = create_app()
cli.register(app)
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
'Notification': Notification, 'Task': Task}
08
将RQ与 Flask 集合在一起
Redis服务的连接URL需要添加到配置中:
代码语言:javascript复制class Config(object):
# ...
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
与往常一样,Redis连接URL将来自环境变量,如果该变量未定义,则替换为该服务在当前主机的端口上运行并使用URL。
应用工厂函数将负责初始化Redis和RQ:
app / __ init__.py:整合RQ
代码语言:javascript复制# ...
from redis import Redis
import rq
# ...
def create_app(config_class=Config):
# ...
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
# ...
app.task_queue
将成为提交任务的重量。将附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue
来访问它。为了方便应用的任何部分提交或检查任务,我可以在User
模型中创建一些辅助方法:
app / models.py:用户模型中的任务辅助方法
代码语言:javascript复制# ...
class User(UserMixin, db.Model):
# ...
def launch_task(self, name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' name, self.id,
*args, **kwargs)
task = Task(id=rq_job.get_id(), name=name, description=description,
user=self)
db.session.add(task)
return task
def get_tasks_in_progress(self):
return Task.query.filter_by(user=self, complete=False).all()
def get_task_in_progress(self, name):
return Task.query.filter_by(name=name, user=self,
complete=False).first()
launch_task()
方法是将任务提交到RQ,然后将其添加到数据库中。name
参数是函数名称,如app / tasks.py中所定义的那样。提交给RQ时,该函数已app.tasks.
预先添加到该名称中以构建符合规范的函数名称。description
参数是对呈现给用户的任务的友好描述。对于导出用户动态的函数,我将名称设置为export_posts
,将描述设置为Exporting posts...
。其余参数将传递给任务函数。launch_task()
函数首先调用队列的enqueue()
方法来提交作业。返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task
对象
请注意,launch_task()
将新的任务对象添加到会话中,但不会发出提交。替代,最好在更高层次函数中的数据库会话上进行操作,因为它允许您在替代事务中组合由替代这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交
get_tasks_in_progress()
方法返回该用户未完成任务的列表。稍后您会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中
最后,get_task_in_progress()
是上一个方法的简化版本并返回指定的任务。我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行
09
利用 RQ 任务发送电子邮件
不要认为本节偏离主题,我在上面说过,当后台完成任务完成时,将使用包含所有用户动态的JSON文件向用户发送电子邮件。我在第十章中生成的电子邮件功能需要通过两种方式进行扩展。首先,我需要添加对文件附件的支持,刹车我可以附加JSON文件。串行,send_email()
函数总是使用后台线程初始化发送电子邮件。当我要从后台任务发送电子邮件时(已经是初步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。
幸运的是,Flask-Mail支持附件,所以我需要做的就是扩展send_email()
函数的控件关键字参数,然后在Message
对象中配置它。选择在前台发送电子邮件时,我只需要添加一个sync=True
的关键字参数即可:
app / email.py:发送带附件的邮件
代码语言:javascript复制# ...
def send_email(subject, sender, recipients, text_body, html_body,
attachments=None, sync=False):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
if attachments:
for attachment in attachments:
msg.attach(*attachment)
if sync:
mail.send(msg)
else:
Thread(target=send_async_email,
args=(current_app._get_current_object(), msg)).start()
消息类的attach()
方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。文件名就是收件人看到的与附件关联的名称。媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。例如,如果您发送为image/png
媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。对于用户动态数据文件,我将使用JSON格式,该格式使用application/json
媒体类型。最后一个参数包含附件内容的字符串或字节序列。
简单来说,send_email()
的attachments
参数将成为一个元组列表,每个元组将有三个元素对应于attach()
的三个参数。因此,我需要转换列表中的每个元素作为参数发送给attach()
。在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)
将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])
。例如,如果如果没有,调用将会引发一个参数,即列表。你有一个列表args = [1, 'foo']
,func(*args)
将会传递两个参数,就和你调用func(1, 'foo')
一样。*args
如电子邮件的同步发送,我需要做的就是,当sync
是True
的时候恢复成调用mail.send(msg)
10
任务助手
尽管我上面使用的example()
任务是一个简单的独立函数,但已添加用户动态的函数却需要应用中具有的某些功能,例如访问数据库和发送电子邮件。因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用程序实例以从中获取它们的配置。因此,我将在app / tasks.py模块的顶部添加Flask应用程序实例和应用程序:
app / tasks.py:创建应用及其自身
代码语言:javascript复制from app import create_app
app = create_app()
app.app_context().push()
当使用flask
命令时,根目录中的microblog.py模块创建应用实例,但RQ worker实际上却一无所知,所以当任务函数时,应用程序在此模块中创建,因为这是RQ worker要导入的唯一模块。你已经在好几个地方app.app_context()
看到了方法,按下一个使其使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config
获取它们的配置。。根本没有,current_app
表达式会返回一个错误。
然后我开始考虑如何在这个函数运行时报告进度。另外通过job.meta
字典传递进度信息之外,我还想将通知推送给客户端,刹车自动动态更新完成百分比。逐步,我将使用我在第二十一章中生成的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,则包含从job.meta
获得的“静态”进度信息,但一旦页面置于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:
app / tasks.py:设置任务进度
代码语言:javascript复制from rq import get_current_job
from app import db
from app.models import Task
# ...
def _set_task_progress(progress):
job = get_current_job()
if job:
job.meta['progress'] = progress
job.save_meta()
task = Task.query.get(job.get_id())
task.user.add_notification('task_progress', {'task_id': job.get_id(),
'progress': progress})
if progress >= 100:
task.complete = True
db.session.commit()
任务导出可以调用_set_task_progress()
来记录进度百分比。函数该首先将百分比写入job.meta
字典搜索并将其保存到Redis的,然后从数据库加载相应的任务对象,使用并task.user
已有的add_notification()
方法将通知推送给请求该任务的用户。通知将被命名为task_progress
,并且伴随关联的数据将成为具有两个关联的字典:任务标识符和进度数值。稍后我将添加JavaScript代码来处理这种新的通知类型
该函数查看进度来确认任务函数是否已完成,并在这种情况下下更新数据库中任务对象的complete
属性。数据库提交调用通过add_notification()
添加的任务和通知对象都立即保存到数据库。任务,确保不执行任何数据库更改,因为执行本次调用父父的更改也写入数据库
11
实现导出任务
现在所有的准备工作已经完成,可以开始编写导出函数了。这个函数的高层结构如下:
app / tasks.py:导出用户动态通用结构
代码语言:javascript复制def export_posts(user_id):
try:
# read user posts from database
# send email with data to user
except:
# handle unexpected errors
请求处理器中的应用程序可以防止意外错误,因为Flask自身捕获异常,然后将其整个任务包装在try / except中。将运行在由RQ控制的单独前进中,而不是烧瓶,因此如果发生任何意外错误,任务将中止,RQ将向控制台显示错误,然后返回等待新的作业。worker的输出或将其记录到文件中,否则将永远不会发现有错误。
让我们从上面带有注释的三部分中最简单的错误处理部分开始梳理:
app / tasks.py:更新用户动态错误处理
代码语言:javascript复制import sys
# ...
def export_posts(user_id):
try:
# ...
except:
_set_task_progress(100)
app.logger.error('Unhandled exception', exc_info=sys.exc_info())
发生意外错误时,我将通过将进度设置为100%来将任务标记为完成,然后使用Flask应用程序中的日志记录器对象记录错误以及如何跟踪信息(调用sys.exc_info()
来获得)。记录器来记录错误的好处在于,你可以观察到你为瓶应用实现的任何日志记录机制。例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。只要使用app.logger
,我也可以得到这些错误信息
接下来,我将编写实际的起始代码,它只需发出一个数据库查询并在循环中遍历结果,随之而来的累积在字典中:
app / tasks.py:从数据库读取用户动态
代码语言:javascript复制import time
from app.models import User, Post
# ...
def export_posts(user_id):
try:
user = User.query.get(user_id)
_set_task_progress(0)
data = []
i = 0
total_posts = user.posts.count()
for post in user.posts.order_by(Post.timestamp.asc()):
data.append({'body': post.body,
'timestamp': post.timestamp.isoformat() 'Z'})
time.sleep(5)
i = 1
_set_task_progress(100 * i // total_posts)
# send email with data to user
except:
# ...
时间格式将采用ISO 8601标准。我使用的Python的datetime
对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC
我维护了一个计数器i
,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts
导致用户动态的总数。使用了i
和total_posts
,在每个循环迭代我都可以使用从0到100的数字来更新任务进度
您可能会好奇我为什么会在每个循环time.sleep(5)
迭代中加入调用。最终是我想要延长增量所需的时间,刹车在用户动态不多的情况下也可以方便地查看到逐步进度的增长
下面是函数的最后部分,将会带上data
附件发送邮件给用户:
app / tasks.py:发送带用户动态的邮件给用户
代码语言:javascript复制import json
from flask import render_template
from app.email import send_email
# ...
def export_posts(user_id):
try:
# ...
send_email('[Microblog] Your blog posts',
sender=app.config['ADMINS'][0], recipients=[user.email],
text_body=render_template('email/export_posts.txt', user=user),
html_body=render_template('email/export_posts.html', user=user),
attachments=[('posts.json', 'application/json',
json.dumps({'posts': data}, indent=4))],
sync=True)
except:
# ...
只是其实对send_email()
函数的调用。附件被定义为一个元组,其中有三个元素被传递给瓶邮件的Message
对象的attach()
方法。元组中的第三个元素是附件内容,它是用Python中的json.dumps()
函数生成的。
这里引用了一对新模板,它们以纯文本和HTML格式提供电子邮件正文的内容。这是文本模板的内容:
app / templates / email / export_posts.txt:更新用户动态文本邮件模板
代码语言:javascript复制Dear {{ user.username }},
Please find attached the archive of your posts that you requested.
Sincerely,
The Microblog Team
这是HTML版本的邮件模板:
app / templates / email / export_posts.html:更新用户动态HTML邮件模板
代码语言:javascript复制<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>
12
应用中的导出功能
剩下的就是将这个功能连接到应用,刹车用户发起请求并通过电子邮件发送用户动态给他们
下面是新的export_posts
视图函数:
app / main / routes.py:导出用户动态路由和视图函数
代码语言:javascript复制@bp.route('/export_posts')
@login_required
def export_posts():
if current_user.get_task_in_progress('export_posts'):
flash(_('An export task is currently in progress'))
else:
current_user.launch_task('export_posts', _('Exporting posts...'))
db.session.commit()
return redirect(url_for('main.user', username=current_user.username))
该功能首先检查用户是否有未完成的任务,并在这种情况下只是闪现消息。对同一用户同时执行两个发起任务是没有意义的,可以避免。我可以使用前面实现的get_task_in_progress()
方法来检查这种情况
如果一个用户没有正在运行的导出任务,则调用launch_task()
来启动它。第一个参数是将传递给RQ worker的函数的名称,改为为app.tasks.
。第二个参数只是一个友好的文本描述,将会显示给用户。这两个值都会被写入数据库中的任务对象。该函数以重定向到用户个人主页结束
我认为最合适的地方是在用户个人主页,只有在用户查看他们自己的主页时,链接在“编辑个人资料”链接下面显示:
app / templates / user.html:用户个人主页的导出链接
代码语言:javascript复制...
<p>
<a href="{{ url_for('main.edit_profile') }}">
{{ _('Edit your profile') }}
</a>
</p>
{% if not current_user.get_task_in_progress('export_posts') %}
<p>
<a href="{{ url_for('main.export_posts') }}">
{{ _('Export your posts') }}
</a>
</p>
...
{% endif %}
此链接的渲染是有条件的,因为我不希望它在用户已经有发起任务执行时出现。
如果你想尝试一下,你可以按如下方式启动应用和RQ worker:
- 确保Redis正在运行
- :一个终端窗口,启动至少一个RQ worker实例。本处你可以运行命令
rq worker microblog-tasks
- 再打开另一个终端窗口,使用
flask run
(记得先设置FLASK_APP
变量)命令启动Flask应用
13
进度通知
为了完善这个功能,我想在后台任务运行时提醒用户任务完成的进度。在浏览Bootstrap组件选项时,我决定在导航栏的下方使用一个Alert组件。横条。我用蓝色的警报框来渲染闪现的消息。现在我要添加一个绿色的警报框来显示任务进度。样式如下:
app / templates / base.html:基础模板中的导出进度Alert组件
代码语言:javascript复制...
{% block content %}
<div class="container">
{% if current_user.is_authenticated %}
{% with tasks = current_user.get_tasks_in_progress() %}
{% if tasks %}
{% for task in tasks %}
<div class="alert alert-success" role="alert">
{{ task.description }}
<span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% endif %}
...
{% endblock %}
...
外部条件在用户未登录时跳过所有与Alert相关的标记。而对于已登录的用户,我通过称为创建的get_tasks_in_progress()
方法来获取当前的任务列表。在当前版本的应用中,我最多只能得到一个结果,因为我可以多个替换任务同时执行,但将来我可能要支持可以共存的其他类型的任务,所以以通用的方式渲染Alert可以节省我以后的时间。
对于每项任务,我都会在页面上渲染一个警报元素。警报的颜色由第二个CSS样式控制,本处是alert-success
,而在闪现消息是alert-info
,引导文档所有游戏有关警报的HTML结构的详细信息。警报文本包括存储在Task
模型中的description
细分,后面跟着完成百分比。
被百分比封装在具有id
属性的<span>
元素中。原因是我要在收到通知时用的JavaScript刷新百分比。我给任务ID附加末尾-progress
来构造id
属性。当有通知到达时,通过其中的任务ID,我可以很容易地使用#<task.id>-progress
选择器找到正确的<span>
元素来更新。
如果您此时进行尝试,则每次导航到新页面时都会看到“静态”的进度更新。您可以注意到,在启动导出任务后,您可以自由导航到应用程序的不同页面,正在运行的任务的状态始终都会展示出来
为了对span>
元素的百分比的动态更新做准备,我将在JavaScript端编写一个辅助函数:
app / templates / base.html:动态更新任务进度的辅助函数
代码语言:javascript复制...
{% block scripts %}
...
<script>
...
function set_task_progress(task_id, progress) {
$('#' task_id '-progress').text(progress);
}
</script>
...
{% endblock %}
这个函数接受一个任务id
和一个进度值,并使用jQuery为这个任务定位<span>
元素,转换为新进度作为其内容写入。无需验证页面上是否存在该元素,因为如果没有找到该元素,jQuery将不会执行任何操作。
app / tasks.py中的_set_task_progress()
函数每次更新进度时调用add_notification()
,就会产生新的通知。而我在第二十一章明智地以完全通用的方式实现了通知功能。所以当浏览器定期向服务器发送时通知更新请求时,浏览器会获得通过add_notification()
方法添加的任何通知
但是,这些JavaScript代码只能识别具有unread_message_count
名称的那些通知,并忽略其余部分。我现在需要做的是扩展该函数,通过调用我上面定义的set_task_progress()
函数来处理task_progress
通知。以下是处理通知更新版本JavaScript代码:
app / templates / base.html:通知处理器
代码语言:javascript复制for (var i = 0; i < notifications.length; i ) {
switch (notifications[i].name) {
case 'unread_message_count':
set_message_count(notifications[i].data);
break;
case 'task_progress':
set_task_progress(
notifications[i].data.task_id,
notifications[i].data.progress);
break;
}
since = notifications[i].timestamp;
}
现在我需要处理两个不同的通知,我决定用一个switch
语句替换检查unread_message_count
通知名称的if
语句,该语句包含我现在需要支持的每个通知。如果你对“ C”系列语言不熟悉,就可能从未见过是switch语句,它提供了一种方便的语法,可以替代一长串的if/elseif
语句。这是一个很棒的特性,因为当我需要支持更多通知时,只需简单地添加case
块即可。
回顾一下,RQ任务附加到task_progress
通知的数据是一个包含两个元素task_id
和progress
的字典,这两个元素是我调用set_task_progress()
的两个参数。
如果您现在运行该应用,则绿色Alert插入的进度指示器将每10秒刷新一次(因为刷新通知的时间间隔是10秒)。
如果您要维护非英语语言文件,则需要使用Flask-Babel刷新翻译文件,然后添加新的翻译:
代码语言:javascript复制(venv) $ flask translate update
如果您使用的是编码翻译,那么我已经为你完成了翻译工作,因此可以从下载包中提取app / translations / es / LC_MESSAGES / messages.po文件,从而将其添加到您的项目中。
翻译文件到位后,还要编译翻译文件:
代码语言:javascript复制(venv) $ flask translate compile