假设我们实现了一个程序,它从 Redis 读取数据,然后写入 MongoDB。一开始程序是这样的:
代码语言:txt复制def read_from_redis():
...
def write_to_mongodb(doc):
...
def parse():
for doc in self.read_from_redis():
self.write_to_mongodb(doc)
在for doc in self.read_from_redis()
的循环中,每次循环返回的是一个字典,这个字典包含很多项,例如age
、date
等等。我们需要设计一些逻辑对这个数据进行处理或者过滤。
但这些逻辑是逐渐增加,一开始只有一个需求,就是如果发现doc
的age
字段中,如果age
不是数字且不能转换为数字,那么需要把它改成N/A
。
后来又增加了一个新的需求,如果doc
里面的date
字段对应的日期小于2020-05-01
,那么这条数据直接丢弃。
接下来还要新增很多其他的需求。为了避免反复修改代码,我们可以实现一个轻量级的插件系统。
我们先实现调用这个插件系统的部分:
代码语言:txt复制plugins = {}
def read_from_redis():
datas = [
{'age': 34, 'name': 'xxx', 'date': '2020-05-10'},
{'age': 12, 'name': 'yyy', 'date': '2020-04-03'},
{'age': '23', 'name': 'zzz', 'date': '2020-05-12'},
{'age': 'aa', 'name': 'abc', 'date': '2020-05-10'},
{'age': 89, 'name': 'def', 'date': '2020-02-10'},
{'age': '', 'name': 'xyz', 'date': '2020-05-10'},
{'age': 'xy', 'name': 'xxx', 'date': '2020-05-10'},
{'age': 'mp', 'name': 'xxx', 'date': '2020-05-10'},
{'age': 34, 'name': 'xxx', 'date': '2019-02-10'},
]
for data in datas:
yield data
def write_to_mongodb(doc):
print(f'正在把数据:{doc} 写入到 MongoDB 中')
def parse():
for doc in read_from_redis():
for name, plugin in plugins.items():
print(f'正在运行插件:{name}')
doc = plugin(doc)
if not doc:
print(f'数据: {doc},被插件:{name}过滤。')
continue
write_to_mongodb(doc)
if __name__ == '__main__':
parse()
看到这里,你会不会觉得很奇怪,这里的plugins
不是一个空字典吗?那你下面的 for 循环怎么能够执行呢?
不慌,我们现在使用装饰器把插件注册
到plugins
中:
def register(plugin):
plugins[plugin.__name__] = plugin
return plugin
@register
def transfer_age_to_int(doc):
try:
doc['age'] = int(doc['age'])
except ValueError:
doc['age'] = 'N/A'
return doc
@register
def filter_date(doc):
date = doc['doc']
if date < '2020-05-01':
return None
return doc
这个装饰器,它的作用就是把它装饰的函数存到plugins
字典中。所以当我们使用装饰器装饰一个 plugin 函数的时候,它就已经被自动注册到plugins
字典中了。不需要我们再手动存放一次。
下面我们来实际运行一下:
可以看到,不是数字且不能被转换为数字的age
字段的值被改成了N/A
;date
小于2020-05-01
的数据就直接丢弃了。