自定义任务引擎Probius上线运行一段时间后,小伙伴跟我反馈有部分任务执行时间太长,等的花儿都谢了,例如下边这个任务竟然执行了超过24分钟
查看每个子任务的执行时间,发现单单消耗在“YARN安装模块”这个子任务的时间就超过20分钟,检查这个子任务的执行逻辑发现并没有发现问题,抛弃Probius,直接在服务器上执行这个子任务惊奇的发现执行时间只有2分钟,由此断定肯定是Probius的问题了
又一次看了下这个子任务,发现任务的输出日志超过1w条,瞬间就知晓了其中的问题,经过简单修改再次测试,原本执行24分钟的任务执行时间缩短至3分钟以内,效率提升相当明显
究竟改了什么拥有如此魔力呢?先来看下下边这段代码
代码语言:javascript复制class Logger:
def __init__(self, tid, state=None):
self.tid = tid
self.state = state
self.datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
def add(self, details):
subtasklog = SubTaskLog.objects.get(id=self.tid)
if details:
details = details.replace('n', ' ').replace('r', ' ')
newlog = self.datetime ' ' details ';'
if subtasklog.details:
subtasklog.details = newlog
else:
subtasklog.details = newlog
if self.state is not None:
subtasklog.state = self.state
subtasklog.save()
任务执行会不断的输出日志,这些日志就通过上边的Logger
类写入数据库,以便前端可以及时读取实时展示,乍看上去并无不妥,但当短时间内日志产生量非常大时便会频繁读写数据库,数据库压力过大从而影响整个程序的执行效率。缓解数据库压力的有效方法就是加缓存
其实当初在写这段代码的时候就考虑到了用缓存,之所以没有用的主要是因为:在项目设计的过程中我提倡尽量减少依赖,不过度设计,以实现需求为目标,尽量让项目简单,这样协作的小伙伴看起代码来不费劲,出了问题还容易查找原因。正常情况下任务的日质量都不大,数据库处理起来也不费劲,能满足需求,引入缓存势必要增加依赖,让项目更复杂,所以就没有加。但从上边的问题来看,数据库已无法满足需求,增加缓存就很有必要了
Django Cache
Django本身就带有一个强大的缓存系统,提供不同级别的缓存粒度:可以缓存特定的视图,也可以只缓存部分模板片段,还可以缓存整个网站。但这几类都不是我想要的,本篇文章不会介绍以上几类缓存的使用,需要的话可以参考官网写的很详细
Django同时还提供了底层缓存API,可以使用这个API以任意级别粒度在缓存中存储对象。这正是我所需要的,每次产生的新日志都不再直接写入数据库,而是先写入缓存中,待任务执行完成后一次写入数据库,这样将大大降低对数据库的消耗,且缓存大都使用内存来存储,读写效率极高
缓存配置
Django的底层缓存API使用非常简单,首先需要在配置文件中配置启用缓存,settings.py
文件中添加如下代码
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': '127.0.0.1:11211',
}
}
这里使用了Memcached
作为缓存服务,Memcached
是一个完全基于内存的缓存服务器,是Django
原生支持的最快、最高效的缓存类型,其他还支持的缓存类型有
- 数据库缓存:django.core.cache.backends.db.DatabaseCache,
LOCATION
为表名 - 文件系统缓存:django.core.cache.backends.filebased.FileBasedCache,
LOCATION
为文件路径 - 本地内存缓存:django.core.cache.backends.locmem.LocMemCache,
LOCATION
被用于标识各个内存存储 - 虚拟缓存:django.core.cache.backends.dummy.DummyCache,仅用于开发模式,只是实现缓存接口,并不做其他操作
- 自定义的缓存后台,例如redis等
我原本是想直接使用本地内存缓存的,这样就无需再安装Memcached
服务了,但是本地内存缓存为进程私有,不可跨进程访问,这就产生了一个问题就是Logger
进程写入内存缓存后,我前端展示的进程读不到,就无法实时输出日志了,遂放弃内存缓存,改用Django支持最好的Memcached
使用Memcached前需要先安装memcached服务,以及python连接memcached的包
代码语言:javascript复制# debian系统安装memcached服务
apt-get install memcached
# 安装python连接memcached的包python-memcached
pip install python-memcached
每个缓存后端都支持配置额外的参数,从而来控制缓存的行为,有效的参数如下:
TIMEOUT: 用于缓存的默认超时时间,以秒为单位,默认为300秒,当设置为None
时表示永不过时,设置为0
表示立刻过期不缓存
KEY_PREFIX: 缓存键前缀,如果有设置,则这个设置的值将自动添加到Django服务器使用的所有缓存键之前
VERSION: 通过Django服务器生成的缓存键的默认版本号,有点类似与Redis的db,以下例子能清晰展示VERSION的作用
代码语言:javascript复制>>> from django.core.cache import cache
>>>
>>> cache.set('site', 'ops-coffee.cn', version=37)
>>>
>>> cache.get('site')
>>>
>>> cache.get('site', version=37)
'ops-coffee.cn'
>>>
OPTIONS: 传递到缓存后端服务的参数,例如我要传递username、password等参数到后端的memcached服务,那么可以这样写
代码语言:javascript复制CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.PyLibMCCache',
'LOCATION': '127.0.0.1:11211',
'OPTIONS': {
'binary': True,
'username': 'user',
'password': 'pass',
'behaviors': {
'ketama': True,
}
}
}
}
缓存访问
开启Django Cache配置后,就可以使用缓存服务了,基本用法如下
代码语言:javascript复制>>> from django.core.cache import cache
cache.set(key, value, timeout=DEFAULT_TIMEOUT, version=None)
其中key
是一个字符串,value
是一个认可picklable形式的python对象,timeout
和version
参数都是可选的,timeout
默认为CACHES
配置中相应后端的timeout
参数,version
为对应的版本,参考上边关于VERSION
的解释
>>> cache.set('site', 'ops-coffee.cn')
>>>
>>> cache.get('site')
'ops-coffee.cn'
>>>
cache.get(key, default=None, version=None)
新的参数default
的意思是,当请求的key不存在时,则返回default
设置的这个值,而不是默认不存在返回的`None
>>> cache.get('name')
>>>
>>> cache.get('name', 'has expired')
'has expired'
cache.add(key, value, timeout=DEFAULT_TIMEOUT, version=None)
与cache.set
类似,只是当add的key不存在时,则新建key,存在则不做任何操作
>>> cache.add('site', 'https://blog.ops-coffee.cn')
False
>>> cache.get('site')
'https://ops-coffee.cn'
>>>
>>> cache.get('name')
>>> cache.add('name', '运维咖啡吧 博客')
True
>>> cache.get('name')
'运维咖啡吧 博客'
新建成功则会返回True,否则返回False
cache.get_or_set(key, default, timeout=DEFAULT_TIMEOUT, version=None)
需要2个参数,第一个为key,第二个为key不存在时设置的值。get_or_set
意思是如果key存在,则返回key的值,如果不存在则给key设置一个值
>>> cache.get('name')
'运维咖啡吧 博客'
>>> cache.get_or_set('name', '咖啡吧博客')
'运维咖啡吧 博客'
>>>
>>> cache.get('path')
>>> cache.get_or_set('path', '/devops')
'/devops'
>>> cache.get('path')
'/devops'
cache.get_many(keys, version=None)
通过传入一个keys列表,以字典格式返回这些列表中key存在的缓存值
代码语言:javascript复制>>> cache.add('name', '运维咖啡吧 博客')
True
>>> cache.set('site', 'https://ops-coffee.cn')
>>> cache.get_many(['site','name','path'])
{'site': 'https://ops-coffee.cn', 'name': '运维咖啡吧 博客'}
cache.set_many(dict, timeout)
通过传入字典,批量设置缓存
代码语言:javascript复制>>> cache.set_many({'site':'ops-coffee.cn','name':'运维咖啡吧'})
cache.delete(key, version=None)
删除一个key
代码语言:javascript复制>>> cache.delete('site')
cache.delete_many(keys, version=None)
批量删除key
代码语言:javascript复制>>> cache.delete_many(['site','name'])
cache.clear()
清空缓存,需要注意的是这会删除缓存里的所有key,可能包含一些并不是你设置的key
代码语言:javascript复制>>> cache.clear()
cache.touch(key, timeout=DEFAULT_TIMEOUT, version=None)
更新key的过期时间为timeout设置的值,timeout是可选的,如果不写则默认为CACHES
设置的TIMEOUT
值
>>> cache.touch('site', 3)
True
更新成功则返回True,否则返回False
cache.incr(key, delta=1, version=None)
incr递增一个已存在的int类型的key的值,默认情况下递增幅度为1,通过指定delta
可以设置递增的幅度
>>> cache.set('num', 1)
>>> cache.incr('num')
2
>>> cache.incr('num', 10)
12
cache.decr(key, delta=1, version=None)
与incr递增类似,decr为递减
代码语言:javascript复制>>> cache.decr('num')
11
>>> cache.decr('num', 5)
6
cache.close()
如果缓存后端已经实现了close()方法,可以通过cache.close()
关闭和缓存的连接
代码优化
知道了代码存在的问题,也了解了Django中如何操作Cache,那么就可以着手优化上边的代码了,优化后的代码如下:
代码语言:javascript复制class Logger:
def __init__(self, tid, state=None):
self.tid = tid
self.state = state
self.datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
self.key = 'engine_subtasklog_%d' % self.tid
def add(self, details, sync=False):
subtasklog = SubTaskLog.objects.get(id=self.tid)
if details:
details = details.replace('n', ' ').replace('r', ' ')
newlog = self.datetime ' ' details ';'
# 将详情更新到缓存中
old_log = cache.get(self.key, '')
cache.set(self.key, old_log newlog) if old_log else cache.set(self.key, newlog)
if self.state is not None:
subtasklog.state = self.state
# 将缓存写入数据库并保存
subtasklog.details = cache.get(self.key)
subtasklog.save()
# 删除缓存
cache.delete(self.key)
根据任务ID创建缓存key,这条任务下的所有Log都会先写入缓存中,当任务结束时再将缓存中的日志一把写入数据库,从而减轻数据库压力,执行效率得到了极大的提升