编写企业微信应用 - 回调URL验证笔记

2022-05-14 18:49:13 浏览数 (1)

Title: 编写企业微信应用 - 回调URL验证笔记

公号: 编写企微应用 - 回调URL验证笔记 (qq.com)

前言

  • 第一版是在2021-08-11 09:36:32写的,有点乱,优化下
  • 验证需要搭建HTTP服务器,使用的框架为Python-Flask
  • 通过API向手机发送信息已解决,目前需要解决通过API接收手机发送的信息
  • 添加回调URL需要先验证URL正确性,企业微信服务器会往URL发送一个请求,我的服务器需要解密请求中的信息并返回,完成验证
  • 验证URL正确性使用的是GET请求,接收业务数据使用的是POST请求,本笔记只讨论验证URL。

笔记

要做的事

  • 写好程序,上传到服务器
  • 到腾讯云给网址添加一条新的解析,用于回调
  • 宝塔面板新建新建一个网站,并且设置好反向代理
  • 去企业微信验证URL

添加新解析与设置反向代理

为什么要添加新解析

  • 方便管理,一个URL对应一个应用
  • 需要给宝塔创建网站

为什么要添加反向代理

  • 为了防止端口冲突,这次验证用的6363端口
  • 同时回调URL只能用80端口/443端口
  • 因此要把发向新建这个网站80端口的数据转到服务器6363端口

到企业微信添加回调URL

如何添加

  • 打开桌面版企业微信后台
  • 进入应用管理,选择想添加的应用
  • 在"功能"处找到"接收消息"点击上面的"api接收"
  • 填入回调URL,并随机生成TokenEncodingAESKey
  • 按需求选择需要接收的消息事件类型
  • 点击"保存"
  • 如果自己写的验证程序没有问题且服务器端口等都打开了,会显示保存成功

获得的参数

  • TokenEncodingAESKey
  • Token在校验消息体签名时会用到
  • EncodingAESKey在解密接收到的消息内容时会用到

编写验证程序

  • 代码编写前需要先安装加解密库
代码语言:javascript复制
python 在 Windows下使用AES时要安装的是pycryptodome 模块:pip install pycryptodome 
python 在 Linux下使用AES时要安装的是pycrypto模块:pip install pycrypto

企业微信回调URL验证请求格式

  • 基本信息
代码语言:javascript复制
假设企业的接收消息的URL设置为http://api.3dept.com。
企业管理员在保存回调配置信息时,企业微信会发送一条验证消息到填写的URL,请求内容如下:

请求方式:GET
请求地址:http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS&timestamp=13500001234&nonce=123412323&echostr=ENCRYPT_STR
  • 参数说明

参数

类型

说明

msg_signature

String

企业微信加密签名,msg_signature计算结合了企业填写的token、请求中的timestamp、nonce、加密的消息体。签名计算方法参考 消息体签名检验

timestamp

Integer

时间戳。与nonce结合使用,用于防止请求重放攻击。

nonce

String

随机数。与timestamp结合使用,用于防止请求重放攻击。

echostr

String

加密的字符串。需要解密得到消息内容明文,解密后有random、msg_len、msg、receiveid四个字段,其中msg即为消息内容明文

大概步骤

  • 接收到企业微信服务器请求
  • 用请求中的msg_signature(消息签名)参数验证消息正确性
  • 解密请求中echostr参数的内容,获得消息明文
  • 响应请求,将解密得到的明文直接返回
  • 注:请求需要在1秒内响应

消息体签名校验:`msg_signature`

步骤
  • token(添加回调URL时从企业微信处获得的)timestampnonceencrypt参数按字母字典从小到大排序(用sort)
  • 将排序后的参数按顺序拼接成一个字符串
  • 将该字符串进行SHA-1计算
  • 如果计算结果与请求中的msg_signature相同,则通过
示例代码
代码语言:python代码运行次数:0复制
# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
    # 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
    li = [token,timestamp,nonce,echostr]
    li.sort()
    # 将排序结果拼接
    li_str = li[0] li[1] li[2] li[3]

    # 计算SHA-1值
    sha1 = hashlib.sha1()
    # update()要指定加密字符串字符代码,不然要报错:
    # "Unicode-objects must be encoded before hashing"
    sha1.update(li_str.encode("utf8"))
    sha1_result = sha1.hexdigest()

    # 比较并返回比较结果
    if sha1_result == msg_signature:
        return True
    else:
        return False

解密消息密文`encrypt`得到信息明文

前言
  • 加密用的是AES-256-CBC,本次的研究结果在这:[AES_256_CBC解密研究_企业微信,2021081601]
`encrypt`明文组成部分
  • [16字节随机字节] [4字节msg_len] [msg_len字节消息正文] [N字节receiveid]
  • 第二部分msg_len存放的是正文消息字节数,用的是16进制存储。
  • 例如msg_len000A则正文消息共有10字节,第21到31字节为正文消息。
  • 因为encrypt前16字节为随机字节,属于无用数据,所有不管iv是什么,都可以解密并获得msg消息正文
步骤
  • 处理密钥EncodingAESKey
  • 使用AES-CBC解密encrypt
  • 从解密的encrypt中获得msg_len
  • 通过msg_len得到正文字节数,并从解密的encrypt中截取消息明文
代码
  • 解密并且提取出明文msg的代码示例
代码语言:python代码运行次数:0复制
import base64
from Crypto.Cipher import AES

# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
    len_remainder = 4 - (len(base64_str) % 4)
    if len_remainder == 0:
        return base64_str
    else:
        for temp in range(0,len_remainder):
            base64_str = base64_str   "="
        return base64_str

ciphertext = "rgF0ehjjxx4fdkdwZyeJ5qxJUAsfGczgK2VQDORoML4K7ou TGFKNicYDgdpPTU0/AZEgOFQAh5bU3MmX2pOlw=="
key = "eeg80S7mUubAJwsPuIEg3bfRfghCbN4zC864e7PV928"

# 处理密文、密钥和iv
ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext))
key_bytes = base64.b64decode(check_base64_len(key))
iv_bytes = key_bytes[:16]

# 解密
decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
plaintext_bytes = decr.decrypt(ciphertext_bytes)

# 截取数据,判断消息正文字节数
msg_len_bytes = plaintext_bytes[16:20]
msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)

# 根据消息正文字节数截取消息正文,并转为字符串格式
msg_bytes = plaintext_bytes[20:20 msg_len]
msg = str(msg_bytes,encoding='utf-8')

# 打印消息正文
print(msg)

完整的服务器验证代码

  • 里面的EncodingAESKeytoken变量值需要到企业微信管理后台获取
代码语言:python代码运行次数:0复制
from flask import Flask
from flask.globals import request
from Crypto.Cipher import AES
import base64,hashlib,xmltodict,json

app = Flask(__name__)

# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
    len_remainder = 4 - (len(base64_str) % 4)
    if len_remainder == 0:
        return base64_str
    else:
        for temp in range(0,len_remainder):
            base64_str = base64_str   "="
        return base64_str
# 解密并提取消息正文
def msg_base64_decrypt(ciphertext_base64,key_base64):
    # 处理密文、密钥和iv
    ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext_base64))
    key_bytes = base64.b64decode(check_base64_len(key_base64))
    iv_bytes = key_bytes[:16]

    # 解密
    decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
    plaintext_bytes = decr.decrypt(ciphertext_bytes)

    # 截取数据,判断消息正文字节数
    msg_len_bytes = plaintext_bytes[16:20]
    msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)

    # 根据消息正文字节数截取消息正文,并转为字符串格式
    msg_bytes = plaintext_bytes[20:20 msg_len]
    msg = str(msg_bytes,encoding='utf-8')

    return msg

# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
    # 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
    li = [token,timestamp,nonce,echostr]
    li.sort()
    # 将排序结果拼接
    li_str = li[0] li[1] li[2] li[3]

    # 计算SHA-1值
    sha1 = hashlib.sha1()
    # update()要指定加密字符串字符代码,不然要报错:
    # "Unicode-objects must be encoded before hashing"
    sha1.update(li_str.encode("utf8"))
    sha1_result = sha1.hexdigest()

    # 比较并返回比较结果
    if sha1_result == msg_signature:
        return True
    else:
        return False

@app.route("/",methods=["get"])
def wx_check_api():
    EncodingAESKey = ""
    token = ""

    msg_signature = request.args.to_dict().get("msg_signature")
    timestamp = request.args.to_dict().get("timestamp")
    nonce = request.args.to_dict().get("nonce")
    echostr = request.args.to_dict().get("echostr")
    # 获取消息体签名校验结果
    check_result = check_msg_signature(msg_signature,token,timestamp,nonce,echostr)
    if check_result:
        decrypt_result = msg_base64_decrypt(echostr,EncodingAESKey)
        print("通过")
        return decrypt_result
    else:
        return ""

if __name__ == "__main__":
    app.run(debug=False,host='0.0.0.0',port=6363)

0 人点赞