前言
Flask 是 Python 开发的轻量 Web 框架,有多轻量呢?10 行以内就可以开发一个 Web 服务,不过这只能用来做演示,今天我就用 1 个小时来开发一个用于生产环境的短信微服务。以下是我们生产环境脱敏后直接可用的服务代码,绝非示例教程。
为什么要开发短信微服务?
短信服务我们都是依赖公有云的实现,通过公有云的 API 直接调用,那为什么还要自己封装呢?
- 因为微服务环境下我们要减少代码的重复量,如果有多个微服务需要使用短信服务,那就要复制多遍代码,把公有云的 API 包装成我们自己的微服务 API 可以将代码的复制减少为一行 Http 请求。
- 调用 API 的 accesskey 和 secret 不需要复制给多个服务,减少安全风险。
- 可以根据我们的业务需求加入共用的业务逻辑。
多了一层调用有没有性能影响?
多了一层调用是多了一个网络请求,但是影响微乎其微。我们不可能因为面向对象的方式太多调用就写逐行执行的代码吧。
- 公有云短信服务本就是异步调用,错误处理也是异步回调的方式。
- 微服务内部网络的调用应该是非常快的,可以同虚拟机部署或者同机房部署。
开始
首先我们建立项目的骨架。
为什么要建立项目的骨架呢?
因为 Flask 太过于轻量,所以例如配置、路由等规范需要由开发人员自己定义。一般成熟的开发团队都有自己的一套开发骨架,要统一配置,统一开发规范,统一集成相关系统等。我这里就分享一套适用于生产环境的非常简单的开发骨架。
新建一个项目目录,然后在里面建立 app 和 config 两个 Python 目录。app 用于存放业务相关代码,config 用于存放配置相关代码。
配置类
在 config/config.py
中添加如下内容,配置的设计因人而异,Flask 也没有做任何限制。我这里的设计是使用 BaseConfig 作为配置基类,存放所有共用的配置,而不同的环境使用不同的配置子类,子类只需要修改特定的值就可以,便于查看。
如果配置的值需要在运行是注入(如数据库连接等),则可以使用环境变量的方式(如下面的 SECRET_KEY),我同时使用 or
提供了没有环境变量的默认值。
import os
class BaseConfig:
"""
配置基类,用于存放共用的配置
"""
SECRET_KEY = os.environ.get('SECRET_KEY') or os.urandom(16)
DEBUG = False
TESTING = False
class ProductionConfig(BaseConfig):
"""
生产环境配置类,用于存放生产环境的配置
"""
pass
class DevelopmentConfig(BaseConfig):
"""
开发环境配置类,用于存放开发环境的配置
"""
DEBUG = True
class TestingConfig(BaseConfig):
"""
测试环境配置类,用于存放开发环境的配置
"""
DEBUG = True
TESTING = True
registered_app = [
'app'
]
config_map = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig
}
复制代码
至于后面的 registered_app
和 config_map
有什么用?可以做自动注入,这个我后面会讲。
然后我加一个日志的配置,日志的配置非常重要,不同的开发团队往往有一套规范的日志配置模版,一般不会改变,所以可以直接定义在代码里,也可以用配置文件的方式。
config/logger.py
from logging.config import dictConfig
def config_logger(enable_console_handler=True, enable_file_handler=True, log_file='app.log', log_level='ERROR',
log_file_max_bytes=5000000, log_file_max_count=5):
# 定义输出到控制台的日志处理器
console_handler = {
'class': 'logging.StreamHandler',
'formatter': 'default',
'level': log_level,
'stream': 'ext://flask.logging.wsgi_errors_stream'
}
# 定义输出到文件的日志处理器
file_handler = {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'detail',
'filename': log_file,
'level': log_level,
'maxBytes': log_file_max_bytes,
'backupCount': log_file_max_count
}
# 定义日志输出格式
default_formatter = {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
}
detail_formatter = {
'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
}
handlers = []
if enable_console_handler:
handlers.append('console')
if enable_file_handler:
handlers.append('file')
d = {
'version': 1,
'formatters': {
'default': default_formatter,
'detail': detail_formatter
},
'handlers': {
'console': console_handler,
'file': file_handler
},
'root': {
'level': log_level,
'handlers': handlers
}
}
dictConfig(d)
复制代码
上面就是一个典型的 Python 日志配置方法,把可变的部分定义为参数(日志文件、级别等),定义了两个日志处理器(文件和控制台),使用时只需要调用这个方法即可。
应用类
定义好配置,我们就开始创建我们的 Flask 应用了。用过 Flask 的同学知道,创建 Flask 应用只需要一行代码。
代码语言:javascript复制app = Flask(__name__)
复制代码
但这不是生产可用的方式,为了生产和测试方便,我们需要用一个方法获取这个 app 对象。
代码语言:javascript复制def create_app(conf=None):
# initialize logger
register_logger()
# check instance path
instance_path = os.environ.get('INSTANCE_PATH') or None
# create and configure the app
app = Flask(__name__, instance_path=instance_path)
if not conf:
conf = get_config_object()
app.config.from_object(conf)
# ensure the instance folder exists
if app.instance_path:
try:
os.makedirs(app.instance_path)
except OSError:
pass
# register app
register_app(app)
return app
复制代码
这里做了几个事情,一是注册日志类,二是载入配置对象,三是创建 instance
目录,四是注册应用业务。
为什么注册日志要放在第一行?
不少开发人员会把日志的配置放在配置类里,这个没太大问题,只是越早注册日志,你的日志就会越早开始收集。如果载入配置类后才配置日志,那如果创建 app 时报错就无法被我们定义的日志收集器收集到了。
注册日志的方法可以这样写
代码语言:javascript复制def register_logger():
log_level = os.environ.get('LOG_LEVEL') or 'INFO'
log_file = os.environ.get('LOG_FILE') or 'app.log'
config_logger(
enable_console_handler=True,
enable_file_handler=True,
log_level=log_level,
log_file=log_file
)
复制代码
我还是从环境变量里获取配置,并调用之前的配置函数配置日志。
载入配置对象的方法。
代码语言:javascript复制def get_config_object(env=None):
if not env:
env = os.environ.get('FLASK_ENV')
else:
os.environ['FLASK_ENV'] = env
if env in config.config_map:
return config.config_map[env]
else:
# set default env if not set
env = 'production'
return config.config_map[env]
复制代码
从 FLASK_ENV
这个环境变量获取运行的环境,然后根据之前配置类里的 config_map
获取对应的配置类,实现配置类的载入。
最后就是注册我们的业务代码。
代码语言:javascript复制def register_app(app):
for a in config.registered_app:
module = importlib.import_module(a)
if hasattr(module, 'register'):
getattr(module, 'register')(app)
复制代码
这里就用到了配置类里的 registered_app
列表,这里定义了要载入的模块,对于微服务来说,一般只有一个模块。
我这里还需要 app/__init__.py
文件里有个 register
方法,这个方法来执行具体的注册操作,例如注册 Flask 蓝图。
def register(app):
api_bp = Blueprint('api', __name__, url_prefix='/api')
app.register_blueprint(api_bp)
复制代码
为什么要搞个 register 方法?
因为每个业务模块有自己的路由、ORM 或蓝图等,这是业务自己的代码,必须与骨架解耦。用一个特定的方法作为规范一是便于自定义的代码扩展,二是便于团队理解,不需要灵活的配置,这里约定大于配置。当然你可以有自己的另一套实现。
我把上面的代码整理为 application.py
模块
import os
import importlib
from flask import Flask
from config.logger import config_logger
from config import config
def register_logger():
log_level = os.environ.get('LOG_LEVEL') or 'INFO'
log_file = os.environ.get('LOG_FILE') or 'app.log'
config_logger(
enable_console_handler=True,
enable_file_handler=True,
log_level=log_level,
log_file=log_file
)
def register_app(app):
for a in config.registered_app:
module = importlib.import_module(a)
if hasattr(module, 'register'):
getattr(module, 'register')(app)
def get_config_object(env=None):
if not env:
env = os.environ.get('FLASK_ENV')
else:
os.environ['FLASK_ENV'] = env
if env in config.config_map:
return config.config_map[env]
else:
# set default env if not set
env = 'production'
return config.config_map[env]
def create_app_by_config(conf=None):
# initialize logger
register_logger()
# check instance path
instance_path = os.environ.get('INSTANCE_PATH') or None
# create and configure the app
app = Flask(__name__, instance_path=instance_path)
if not conf:
conf = get_config_object()
app.config.from_object(conf)
# ensure the instance folder exists
if app.instance_path:
try:
os.makedirs(app.instance_path)
except OSError:
pass
# register app
register_app(app)
return app
def create_app(env=None):
conf = get_config_object(env)
return create_app_by_config(conf)
复制代码
这里提供了 create_app_by_config
方法用于从配置类直接创建 app
对象,主要是便于单元测试时直接注入特定的配置类。
我们的骨架基本上就成型了,包括了最基础的配置类、日志配置和应用注册机制。然后就可以运行我们的 Flask 应用了。
开发测试
Flask 提供了 flask run
命令来运行测试应用,不过还需要提供 FLASK_APP
和 FLASK_ENV
两个环境变量来启动,这步我们也可以简化下。
编写 run.py
import click
from envparse import env
from application import create_app
@click.command()
@click.option('-h', '--host', help='Bind host', default='localhost', show_default=True)
@click.option('-p', '--port', help='Bind port', default=8000, type=int, show_default=True)
@click.option('-e', '--env', help='Running env, override environment FLASK_ENV.', default='development', show_default=True)
@click.option('-f', '--env-file', help='Environment from file', type=click.Path(exists=True))
def main(**kwargs):
if kwargs['env_file']:
env.read_envfile(kwargs['env_file'])
app = create_app(kwargs['env'])
app.run(host=kwargs['host'], port=kwargs['port'])
if __name__ == '__main__':
main()
复制代码
这里用 click
创建了一个简单的命令行脚本,可以通过命令行参数直接启动一个测试用服务。当然默认参数直接可用,使用 python run.py
或者 IDE 里右键运行即可。同时,还提供了 env-file
选项,用户可提供环境变量的文件。
为什么要使用环境变量文件?
因为生产环境和开发环境的许多配置是不同的,例如公有云密钥,数据库连接等,这些信息是绝对不能提交到 git 等版本控制软件的,所以我们可以创建一个 .env 文件如下
代码语言:javascript复制ACCESS_KEY=xxx
ACCESS_SECRET=xxx
复制代码
把这个文件加入 gitignore 中,然后使用 --env-file
载入这个文件就可以在开发环境中直接使用了,而不需要每次都手动输入了。
部署
生产环境我们肯定不会使用测试的方式启动,需要类似 gunicorn
等工具启动一个正式服务,我们也可以使用 Docker 等容器技术把生产部署过程自动化。
编写 server.py
from application import create_app
app = create_app()
复制代码
这里很简单,创建一个 Flask app 对象即可,然后可以通过 gunicorn server:app
启动。
编写 requirements.txt
文件,用于自动安装依赖。后期可以把用到的依赖写进去。
flask
flask-restful
click
envparse
gunicorn
复制代码
编写 Dockerfile
文件
FROM python:3.8
COPY . /opt
WORKDIR /opt
RUN pip install --no-cache-dir -r requirements.txt
CMD ["gunicorn", "-b", "0.0.0.0:80", "server:app"]
复制代码
然后就可以使用如下命令用 Docker 启动服务容器了。
代码语言:javascript复制docker build -t myapp:0.1 .
docker run -d --name myapp -p 80:80 myapp:0.1
复制代码
至此,一个简单的 Flask 骨架就完成了,大家可以在下面看到完整的项目。
Github Flask 骨架示例
编写业务
上面大概用了 20 分钟搞了个 Flask 的骨架,对于开发团队来说,骨架只要开发一次,后续的项目直接克隆就行了。下面我们就来编写具体的发送短信业务。
使用哪个公有云?
实际业务中我们可能使用单一一个云,也可能混合使用多个云。在我们的实际业务中,具体用哪个公有云的服务,不是取决于我们,而是取决于谁的价格低,谁的优惠多,谁的功能强。?
所以我们可以提取短信业务的共性写一个抽象类。短信服务的共同点主要有短信模版,签名,接收人,模版参数等。
一个简单的抽象类
代码语言:javascript复制class SmsProvider:
def __init__(self, **kwargs):
self.conf = kwargs
def send(self, template, receivers, **kwargs):
pass
复制代码
然后有基于阿里云的实现,以下代码根据官方示例修改
代码语言:javascript复制class AliyunSmsProvider(SmsProvider):
def send(self, template, receivers, **kwargs):
from aliyunsdkcore.request import CommonRequest
client = self.get_client(self.conf['app_key'], self.conf['app_secret'], self.conf['region_id'])
request = CommonRequest()
request.set_accept_format('json')
request.set_domain(self.conf['domain'])
request.set_method('POST')
request.set_protocol_type('https')
request.set_version(self.conf['version'])
request.set_action_name('SendSms')
request.add_query_param('RegionId', self.conf['region_id'])
request.add_query_param('PhoneNumbers', receivers)
request.add_query_param('SignName', self.conf['sign_name'])
request.add_query_param('TemplateCode', self.get_template_id(template))
request.add_query_param('TemplateParam', self.build_template_params(**kwargs))
return client.do_action_with_exception(request)
def get_template_id(self, name):
if name in self.conf['template_id_map']:
return self.conf['template_id_map'][name]
else:
raise ValueError('no template {} found!'.format(name))
@staticmethod
def get_client(app_key, app_secret, region_id):
from aliyunsdkcore.client import AcsClient
return AcsClient(app_key, app_secret, region_id)
@staticmethod
def build_template_params(**kwargs):
if 'params' in kwargs and kwargs['params']:
return json.dumps(kwargs['params'])
else:
return ''
复制代码
然后在 BaseConfig
添加以下配置,是一些公有云 API 的基本配置,需要在运行是通过环境变量载入,其中 template_id_map
里的内容是模版的名称和对应的 ID,用于区分不同的短信模版,如验证码,推广等,名称作为参数供调用方使用,避免了直接传递 ID。
# SMS config
SMS_CONF = {
'aliyun': {
'provider_cls': 'app.sms.AliyunSmsProvider',
'config': {
'domain': 'dysmsapi.aliyuncs.com',
'version': os.environ.get('ALIYUN_SMS_VERSION') or '2017-05-25',
'app_key': os.environ.get('ALIYUN_SMS_APP_KEY'),
'app_secret': os.environ.get('ALIYUN_SMS_APP_SECRET'),
'region_id': os.environ.get('ALIYUN_SMS_REGION_ID'),
'sign_name': os.environ.get('ALIYUN_SMS_SIGN_NAME'),
'template_id_map': {
'captcha': 'xxx'
}
}
}
}
复制代码
其中模版 ID,签名,App Key,App Secret 需要在阿里云控制台获取,模版和签名需要审核后才能获得。
同样的方法可以添加华为云的 API,也可直接从示例修改,只是华为云暂时没有 SDK,需要通过 API 调用,大同小异。
代码语言:javascript复制class HuaweiSmsProvider(SmsProvider):
def send(self, template, receivers, **kwargs):
header = {'Authorization': 'WSSE realm="SDP",profile="UsernameToken",type="Appkey"',
'X-WSSE': self.build_wsse_header(self.conf['app_key'], self.conf['app_secret'])}
form_data = {
'from': self.conf['sender'],
'to': receivers,
'templateId': self.get_template_id(template),
'templateParas': self.build_template_params(**kwargs),
}
r = requests.post(self.conf['url'], data=form_data, headers=header, verify=False)
return r
def get_template_id(self, name):
if name in self.conf['template_id_map']:
return self.conf['template_id_map'][name]
else:
raise ValueError('no template {} found!'.format(name))
@staticmethod
def build_wsse_header(app_key, app_secret):
now = time.strftime('%Y-%m-%dT%H:%M:%SZ')
nonce = str(uuid.uuid4()).replace('-', '')
digest = hashlib.sha256((nonce now app_secret).encode()).hexdigest()
digest_base64 = base64.b64encode(digest.encode()).decode()
return 'UsernameToken Username="{}",PasswordDigest="{}",Nonce="{}",Created="{}"'.format(app_key, digest_base64,
nonce, now)
@staticmethod
def build_template_params(**kwargs):
if 'params' in kwargs and kwargs['params']:
return json.dumps(list(kwargs['params'].values()))
else:
return ''
复制代码
也是添加配置,最后的 BaseConfig
如下所示,其中 SMS_PROVIDER
配置指定 SMS_CONF
的键,指定我们现在使用的是哪个公有云服务:
class BaseConfig:
SECRET_KEY = os.environ.get('SECRET_KEY') or os.urandom(16)
DEBUG = False
TESTING = False
# SMS config
SMS_PROVIDER = os.environ.get('SMS_PROVIDER')
SMS_CONF = {
'aliyun': {
'provider_cls': 'app.sms.AliyunSmsProvider',
'config': {
'domain': 'dysmsapi.aliyuncs.com',
'version': os.environ.get('ALIYUN_SMS_VERSION') or '2017-05-25',
'app_key': os.environ.get('ALIYUN_SMS_APP_KEY'),
'app_secret': os.environ.get('ALIYUN_SMS_APP_SECRET'),
'region_id': os.environ.get('ALIYUN_SMS_REGION_ID'),
'sign_name': os.environ.get('ALIYUN_SMS_SIGN_NAME'),
'template_id_map': {
'captcha': 'xxx'
}
}
},
'huawei': {
'provider_cls': 'app.sms.HuaweiSmsProvider',
'config': {
'url': os.environ.get('HUAWEI_URL'),
'app_key': os.environ.get('HUAWEI_SMS_APP_KEY'),
'app_secret': os.environ.get('HUAWEI_SMS_APP_SECRET'),
'sender': os.environ.get('HUAWEI_SMS_SENDER_ID'),
'template_id_map': {
'captcha': 'xxx'
}
}
}
}
复制代码
其他的公有云也可以通过类似的方式添加。
然后我们添加一个方法,获取 Provider 的单例对象。这里使用 Flask 的 g 对象,把我们的 Provider 对象注册成全局的单例对象。
代码语言:javascript复制from flask import g, current_app
from werkzeug.utils import import_string
def create_sms():
provider = current_app.config['SMS_PROVIDER']
sms_config = current_app.config['SMS_CONF']
if provider in sms_config:
cls = sms_config[provider]['provider_cls']
conf = sms_config[provider]['config']
sms = import_string(cls)(**conf)
return sms
return None
def get_sms():
if 'sms' not in g:
g.sms = create_sms()
return g.sms
复制代码
这些都完成后,就可以添加一个视图类,这里用到了 Flask-Restful 库,生成 API 视图。
app/api/sms.py
import logging
from flask_restful import Resource, reqparse
from app.sms import get_sms
# 定义参数,参考 https://flask-restful.readthedocs.io/en/latest/reqparse.html
parser = reqparse.RequestParser(bundle_errors=True)
parser.add_argument('receivers', help='Comma separated receivers.', required=True)
parser.add_argument('template', help='Notification template name.', required=True)
parser.add_argument('params', help='Notification template params.', type=dict)
class Sms(Resource):
def post(self):
args = parser.parse_args()
sms = get_sms()
try:
res = sms.send(**args)
except Exception as e:
logging.error(e)
return {'message': 'failed'}, 500
if res.status_code < 300:
return {'message': 'send'}, 200
else:
logging.error('Send sms failed with {}'.format(res.text))
return {'message': 'failed'}, 500
复制代码
然后我们定义路由。
app/api/__init__.py
from flask import Blueprint
from flask_restful import Api
from app.api.health import Health
from app.api.sms import Sms
api_bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(api_bp)
api.add_resource(Sms, '/sms')
复制代码
最后记得在我们的应用 app 模块里注册蓝图。
app/__init__.py
from app.api import api_bp
# register blueprint
def register(app):
app.register_blueprint(api_bp)
复制代码
至此,我们的短信微服务就完成了。可以通过我们上面的方法进行测试和部署。
其中我们定义了一些环境变量,在测试时可通过环境变量文件载入,运行时可通过容器的环境变量载入。放在 instance 目录下是因为 instance 是我们默认的 Flask 实例目录,这个目录是不会提交到 git 里的。
instance/env
SMS_PROVIDER=huawei
HUAWEI_URL=https://rtcsms.cn-north-1.myhuaweicloud.com:10743/sms/batchSendSms/v1
HUAWEI_SMS_APP_KEY=aaa
HUAWEI_SMS_APP_SECRET=bbb
HUAWEI_SMS_SENDER_ID=ccc
复制代码
运行时通过环境变量载入
代码语言:javascript复制docker run -d --name sms -p 80:80
-e "SMS_PROVIDER=aliyun"
-e "ALIYUN_SMS_APP_KEY=aaa"
-e "ALIYUN_SMS_APP_SECRET=bbb"
-e "ALIYUN_SMS_REGION_ID=cn-hangzhou"
-e "ALIYUN_SMS_SIGN_NAME=ccc"
myapp:0.1
复制代码
完整的项目可在这里查看。
示例项目代码
然后我们可以做如下测试,注意修改配置中的模版 ID 和环境变量,并根据自己的模版参数修改 params。
结语
对于老鸟来说,开发这个项目,可能根本不需要 1 个小时。对于规范的线上项目来说,还是缺少一些东西的,例如单元测试。大家的生产 API 服务是怎么样的?欢迎讨论!