写在前面
- 有这样一个需求
- 以文件的方式定期给集团同步增量数据,我想把所有的静态数据抽离出来,通过配置文件的方式
- 需求比较简单,所以用选择
python
- 配置文件用
yaml
,写了一个小模块- 实现
配置文件
读入内存为配置字典
- 实现
配置文件
的动态加载
读入内存为配置字典
- 实现
配置字典
由内存导出静态文件
- 实现
- 理解错误的地方请小伙伴批评指正
「 我只是怕某天死了,我的生命却一无所有。----《奇幻之旅》」
这里需要说明的是,常说的动态加载配置
,一般基于观察者设计模式
实现的发布/订阅系统
,一般有两种模式,分别是推(Push)
模式和拉(Pull)
模式。
推模式
:服务端主动将数据更新发送给所有订阅的客户端,拉模式
:由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。
我们这里只是提供了一个可以动态加载配置文件刷新配置对象
的方法,把配置对象
定义为单例
,刷新的时候把当前存在的配置对象干掉,然后从新加载配置文件生成新的配置对象。即通过拉(Pull)
的方式实现。这里配置对象为主题
,使用配置对象的多个代码为观察者
。
先来看一下脚本yaml_util.py
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""
@File : yaml_util.py
@Time : 2022/03/22 14:10:46
@Author : Li Ruilong
@Version : 1.0
@Contact : 1224965096@qq.com
@Desc : 加载配置文件
pip install pyyaml
"""
# here put the import lib
import os
import time
import yaml
import logging
import json
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
class Yaml:
_config = None
def __new__(cls, *args, **kw):
# hasattr函数用于判断对象是否包含对应的属性。
if not hasattr(cls, '_instance'):
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, file_name="config.yaml"):
config_temp = None
try:
# 获取当前脚本所在文件夹路径
cur_path = os.path.dirname(os.path.realpath(__file__))
# 获取yaml文件路径
yaml_path = os.path.join(cur_path, file_name)
f = open(yaml_path, 'r', encoding='utf-8')
config_temp = f.read()
except Exception as e:
logging.info("配置文件加载失败", e)
finally:
f.close()
self._config = yaml.safe_load(config_temp) # 用load方法转化
def __str__(self):
return json.dumps(self._config)
def __del__(self):
self._config = None
self = None
@staticmethod
def get_config(file_name="config.yaml"):
return Yaml(file_name)._config
@staticmethod
def refresh_config(cls, file_name="config.yaml"):
del cls
return Yaml(file_name)._config
def set_config(contain, file_name="config_.yaml"):
# 配置字典由内存导入静态文件
cur_path = os.path.dirname(os.path.realpath(__file__))
yaml_path = os.path.join(cur_path, file_name)
with open(yaml_path, 'w', encoding='utf-8') as f:
yaml.dump(contain, f)
def get_yaml_config(file_name="config.yaml"):
# 配置文件读入内存为配置字典
return Yaml.get_config(file_name)
def refresh_yaml_config(cls, file_name="config.yaml"):
# 配置文件的动态加载读入内存为字典
return Yaml.refresh_config(cls,file_name)
if __name__ == '__main__':
my_yaml_1 = Yaml()
my_yaml_2 = Yaml()
#id关键字可用来查看对象在内存中的存放位置
print(id(my_yaml_1) == id(my_yaml_2))
time.sleep(10)
# 修改配置文件后从新加载配置字典会刷新
refresh_yaml_config(my_yaml_1)
上面是写好加载配置类模块,下面为定义的配置文件
。
# mysql数据库相关配置
mysql:
db_host: "127.0.0.1"
db_port: 3306
db_user: "root"
db_password: "root"
db_name: "uamdb"
# ssh相关配置
ssh:
ssh_hostname: "192.168.26.55"
ssh_username: "root"
ssh_password: 'redhat'
ssh_port: 22
#文件模式UAG 省份标识 年月日 序号.txt
file_name_template: "UAG07{0}{1}.txt"
# 本地文件存储位置
local_file_path: "./uam/sftp/"
# 集团服务器上传文件位置
remote_path: "/app/sftp/ftpuam07/increment/"
# 上传周期
period:
date: YEAR #MINUTE,HOUR ,DAY,WEEK,MONTH,YEAR
offset: 1 #几天,几周,几月 前的数据
# 增量数据模板: 文件内容格式:账号,账号类型,账号密码,客户ID,密码加密类型,省份编码,账号状态
template:
- line: "{unified_code}^{unified_type}^{unified_pwd}^{cust_codes}^Pwd007^07^{unified_state}"
sql: "SELECT a.unified_code as unified_code ,a.unified_type as unified_type ,a.unified_pwd as unified_pwd
,a.unified_state as unified_state , b.cust_code as cust_codes FROM AU_UNIFIED a INNER JOIN AU_PRODUCT b
ON a.unified_code = b.product_code AND a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )"
- line: "{product_code}^{prod_type}^{product_pwd}^{cust_code}^{pwd_type}^07^St001"
sql: "SELECT product_code, product_pwd,cust_code,prod_type,
(CASE WHEN a.prod_type='2000004' then 'Pwd001' WHEN a.prod_type='2000001' then 'Pwd001' WHEN a.prod_type='2000002' then 'Pwd001'
WHEN a.prod_type='2110008' then 'Pwd001' WHEN a.prod_type='2110011' then 'Pwd001' ELSE 'Pwd001' END) as pwd_type
from AU_PRODUCT a WHERE a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )"
「如何使用」
下面为一个数据库连接池工具类的使用。
代码语言:javascript复制.....
import yaml_util
from dbutils.pooled_db import PooledDB
class MysqlPool:
def __init__(self):
mysql = yaml_util.get_config()["mysql"]
print(mysql)
self.POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
host=mysql["db_host"],
port=mysql["db_port"],
user=mysql["db_user"],
password=mysql["db_password"],
database=mysql["db_name"],
charset='utf8'
)
...................
「关于如何触发刷新配置文件
方法」
我们这里修改完配置文件通过UI界面
主动调用函数加载。其他项目场景个人觉得可以通过心跳
或者探针
的机制传递文件摘要信息串
(通过MD5
,SHA
等信息摘要算法生成)进行比对,具体的手段可以通过类似脏值轮询检查
或者数据劫持
等方式
「关于观察者设计模式
,是一个很常用的设计模式」
基于MVVM
模式的前端框架中,双向数据绑定特性,如Vue.js
都是基于此,在系统运行过程中,一旦系统中的数据模型发生了变化,观察者 Observer
的setter
访问器属性就会被触发,此时消息订阅中心会遍历它所维护的所有订阅者,对于每一个订阅了该数据的对象,向它发出一个更新通知
,订阅者收到通知后就会对视图进行相应的更新。以上过程不断往复循环,这就是MVVM
模式在Vue.js
中的运行原理。
后端分布式一致性解决方案
中,使用ZooKeeper
做配置中心
时,也是基于观察者模式
,采用的是推拉相结合
的方式,客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher
事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。将配置信息存放到ZooKeeper
上进行集中管理,应用在启动的时候都会主动到ZooKeeper
服务端上进行一次配置信息的获取,同时,在指定节点上注册一个Watcher
监听,这样一来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。
包括JDK
很早的版本就有观察者模式
的实现,java.util
包内包含最基本的Observer接口
与Observable类
,不过主题类(Observable)
定义成了一个基本类不是接口,所以只能通过继承
的方式实现,考虑的继承的可维护性太差
,一般不怎么使用。
「关于单例模式」
单例模式
是很常用的一种设计模式,即在整个生命周期中,对于该一个类生产的对象始终都是一个,不曾变化。保证了一个类仅有一个实例,并提供一个访问它的全局访问点。
单例的优点
有很多,GOF
中这样描述:
对唯一实例
的受控访问,缩小名空间
,Singleton模式
是对全局变量的一种改进。它避免了那些存储唯一实例的全局变量污染名空间。允许对操作和表示的精化,Singleton类可以有子类,而且用这个扩展类的实例来配置一个应用是很容易的。你可以用你所需要的类的实例在运行时刻配置应用等等,感兴趣小伙伴可以去看看《设计模式_可复用面向对象软件的基础》