Title: 编写企业微信应用 - 回调URL验证笔记
公号: 编写企微应用 - 回调URL验证笔记 (qq.com)
前言
- 第一版是在
2021-08-11 09:36:32
写的,有点乱,优化下 - 验证需要搭建HTTP服务器,使用的框架为Python-Flask
- 通过API向手机发送信息已解决,目前需要解决通过API接收手机发送的信息
- 添加回调URL需要先验证URL正确性,企业微信服务器会往URL发送一个请求,我的服务器需要解密请求中的信息并返回,完成验证
- 验证URL正确性使用的是GET请求,接收业务数据使用的是POST请求,本笔记只讨论验证URL。
笔记
要做的事
- 写好程序,上传到服务器
- 到腾讯云给网址添加一条新的解析,用于回调
- 宝塔面板新建新建一个网站,并且设置好反向代理
- 去企业微信验证URL
添加新解析与设置反向代理
为什么要添加新解析
- 方便管理,一个URL对应一个应用
- 需要给宝塔创建网站
为什么要添加反向代理
- 为了防止端口冲突,这次验证用的6363端口
- 同时回调URL只能用80端口/443端口
- 因此要把发向新建这个网站80端口的数据转到服务器6363端口
到企业微信添加回调URL
如何添加
- 打开桌面版企业微信后台
- 进入应用管理,选择想添加的应用
- 在"功能"处找到"接收消息"点击上面的"api接收"
- 填入回调URL,并随机生成
Token
和EncodingAESKey
- 按需求选择需要接收的消息事件类型
- 点击"保存"
- 如果自己写的验证程序没有问题且服务器端口等都打开了,会显示保存成功
获得的参数
Token
和EncodingAESKey
Token
在校验消息体签名时会用到EncodingAESKey
在解密接收到的消息内容时会用到
编写验证程序
- 代码编写前需要先安装加解密库
python 在 Windows下使用AES时要安装的是pycryptodome 模块:pip install pycryptodome
python 在 Linux下使用AES时要安装的是pycrypto模块:pip install pycrypto
企业微信回调URL验证请求格式
- 基本信息
假设企业的接收消息的URL设置为http://api.3dept.com。
企业管理员在保存回调配置信息时,企业微信会发送一条验证消息到填写的URL,请求内容如下:
请求方式:GET
请求地址:http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS×tamp=13500001234&nonce=123412323&echostr=ENCRYPT_STR
- 参数说明
参数 | 类型 | 说明 |
---|---|---|
msg_signature | String | 企业微信加密签名,msg_signature计算结合了企业填写的token、请求中的timestamp、nonce、加密的消息体。签名计算方法参考 消息体签名检验 |
timestamp | Integer | 时间戳。与nonce结合使用,用于防止请求重放攻击。 |
nonce | String | 随机数。与timestamp结合使用,用于防止请求重放攻击。 |
echostr | String | 加密的字符串。需要解密得到消息内容明文,解密后有random、msg_len、msg、receiveid四个字段,其中msg即为消息内容明文 |
大概步骤
- 接收到企业微信服务器请求
- 用请求中的
msg_signature
(消息签名)参数验证消息正确性 - 解密请求中
echostr
参数的内容,获得消息明文 - 响应请求,将解密得到的明文直接返回
- 注:请求需要在1秒内响应
消息体签名校验:`msg_signature`
步骤
- 将
token(添加回调URL时从企业微信处获得的)
、timestamp
、nonce
、encrypt
参数按字母字典从小到大排序(用sort) - 将排序后的参数按顺序拼接成一个字符串
- 将该字符串进行SHA-1计算
- 如果计算结果与请求中的
msg_signature
相同,则通过
示例代码
代码语言:python代码运行次数:0复制# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
# 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
li = [token,timestamp,nonce,echostr]
li.sort()
# 将排序结果拼接
li_str = li[0] li[1] li[2] li[3]
# 计算SHA-1值
sha1 = hashlib.sha1()
# update()要指定加密字符串字符代码,不然要报错:
# "Unicode-objects must be encoded before hashing"
sha1.update(li_str.encode("utf8"))
sha1_result = sha1.hexdigest()
# 比较并返回比较结果
if sha1_result == msg_signature:
return True
else:
return False
解密消息密文`encrypt`得到信息明文
前言
- 加密用的是AES-256-CBC,本次的研究结果在这:[AES_256_CBC解密研究_企业微信,2021081601]
`encrypt`明文组成部分
- [16字节随机字节] [4字节
msg_len
] [msg_len
字节消息正文] [N字节receiveid
] - 第二部分
msg_len
存放的是正文消息字节数,用的是16进制存储。 - 例如
msg_len
为000A
则正文消息共有10字节,第21到31字节为正文消息。 - 因为encrypt前16字节为随机字节,属于无用数据,所有不管iv是什么,都可以解密并获得msg消息正文
步骤
- 处理密钥
EncodingAESKey
- 使用
AES-CBC
解密encrypt
- 从解密的
encrypt
中获得msg_len
- 通过
msg_len
得到正文字节数,并从解密的encrypt
中截取消息明文
代码
- 解密并且提取出明文msg的代码示例
import base64
from Crypto.Cipher import AES
# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
len_remainder = 4 - (len(base64_str) % 4)
if len_remainder == 0:
return base64_str
else:
for temp in range(0,len_remainder):
base64_str = base64_str "="
return base64_str
ciphertext = "rgF0ehjjxx4fdkdwZyeJ5qxJUAsfGczgK2VQDORoML4K7ou TGFKNicYDgdpPTU0/AZEgOFQAh5bU3MmX2pOlw=="
key = "eeg80S7mUubAJwsPuIEg3bfRfghCbN4zC864e7PV928"
# 处理密文、密钥和iv
ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext))
key_bytes = base64.b64decode(check_base64_len(key))
iv_bytes = key_bytes[:16]
# 解密
decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
plaintext_bytes = decr.decrypt(ciphertext_bytes)
# 截取数据,判断消息正文字节数
msg_len_bytes = plaintext_bytes[16:20]
msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)
# 根据消息正文字节数截取消息正文,并转为字符串格式
msg_bytes = plaintext_bytes[20:20 msg_len]
msg = str(msg_bytes,encoding='utf-8')
# 打印消息正文
print(msg)
完整的服务器验证代码
- 里面的
EncodingAESKey
和token
变量值需要到企业微信管理后台获取
from flask import Flask
from flask.globals import request
from Crypto.Cipher import AES
import base64,hashlib,xmltodict,json
app = Flask(__name__)
# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
len_remainder = 4 - (len(base64_str) % 4)
if len_remainder == 0:
return base64_str
else:
for temp in range(0,len_remainder):
base64_str = base64_str "="
return base64_str
# 解密并提取消息正文
def msg_base64_decrypt(ciphertext_base64,key_base64):
# 处理密文、密钥和iv
ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext_base64))
key_bytes = base64.b64decode(check_base64_len(key_base64))
iv_bytes = key_bytes[:16]
# 解密
decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
plaintext_bytes = decr.decrypt(ciphertext_bytes)
# 截取数据,判断消息正文字节数
msg_len_bytes = plaintext_bytes[16:20]
msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)
# 根据消息正文字节数截取消息正文,并转为字符串格式
msg_bytes = plaintext_bytes[20:20 msg_len]
msg = str(msg_bytes,encoding='utf-8')
return msg
# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
# 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
li = [token,timestamp,nonce,echostr]
li.sort()
# 将排序结果拼接
li_str = li[0] li[1] li[2] li[3]
# 计算SHA-1值
sha1 = hashlib.sha1()
# update()要指定加密字符串字符代码,不然要报错:
# "Unicode-objects must be encoded before hashing"
sha1.update(li_str.encode("utf8"))
sha1_result = sha1.hexdigest()
# 比较并返回比较结果
if sha1_result == msg_signature:
return True
else:
return False
@app.route("/",methods=["get"])
def wx_check_api():
EncodingAESKey = ""
token = ""
msg_signature = request.args.to_dict().get("msg_signature")
timestamp = request.args.to_dict().get("timestamp")
nonce = request.args.to_dict().get("nonce")
echostr = request.args.to_dict().get("echostr")
# 获取消息体签名校验结果
check_result = check_msg_signature(msg_signature,token,timestamp,nonce,echostr)
if check_result:
decrypt_result = msg_base64_decrypt(echostr,EncodingAESKey)
print("通过")
return decrypt_result
else:
return ""
if __name__ == "__main__":
app.run(debug=False,host='0.0.0.0',port=6363)