2023年3月1号,OpenAI正式发布了chatGPT的第一版接口,得益于api,势必会冒出大量国产chatGPT应用,本教程以官方教程为例,以简单的代码介绍如何使用python redis django将chatGPT功能以http协议的形式集成至公众号或者网页、手机应用中,核心代码仅100余行,部分代码更是因为微信公共号限制而增加的处理。
效果展示
这是本文代码实现的具体效果,如果想知道该效果是否满足你的需求,可以直接关注公众号【智子不智】回复尝试,测试一下是否符合自己的需求,如果符合的话可以继续往下看。
另外计划下周会更新将stable difussion集成进公众号的方案也写一下,有兴趣的话可以mark一下,地址会后续贴到本文中。
接下来文章将会先介绍相关代码,由于实际应用场景不同,作为样例的实际部署代码至微信公众号,会放到文章后半部分。
OpenAI样例代码解读
代码语言:javascript复制messages = []
system_msg = input("What type of chatbot would you like to create?n")
messages.append({"role": "system", "content": system_msg})
print("Say hello to your new assistant!")
while input != "quit()":
message = input("")
messages.append({"role": "user", "content": message})
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages
)
replay = response["choices"][0]["message"]["content"]
messages.append({"role": "system", "content": replay})
print('n' replay 'n')
如果提示openai.ChatCompletion不存在请将openai升级至2.7.0
代码语言:javascript复制pip install --upgrade openai
使用接口openai.ChatCompletion.create,指定模型版本model="gpt-3.5-turbo",就可以根据历史对话信息messages,获取新的对话信息。由于chatGPT是一个多轮对话模型,因此当需要考虑上下文的时候需要将以前的对话信息全部放入接口的messages参数中。
由实例代码可知,只需要有效存储用户的历史对话数据即可直接通过API获得下一轮输出,如果你的应用没有任何限制,直接使用上述代码即可。
本文使用redis来作为用户历史对话信息存储媒介,除此之外,也可以将回复结果缓存至redis,来应对一些有限制的场景,如微信公众号限制回复时间为5s,最多重试三次,对于目前语言生成模型来说,几千字的输出很难在这个时间内完成,因此需要某些方法避免无法获取结果。
数据管理类
代码语言:javascript复制class MessageManager:
"""
用于管理接受到信息的回复,微信最多三次15s
"""
def __init__(self, message: TextMessage, redis_handle):
self.source = message.source
self.target = message.target
self.content = message.content
self.msg_id = str(message.id)
self.redis_handle = redis_handle
以微信公众号收到的数据结构为例,TextMessage是wechat解析的数据对象,一个标准的数据如下:
代码语言:javascript复制TextMessage({'ToUserName': 'gh_a3752fb772', 'FromUserName': 'oBime5wlI7qq3sJg45p1gV4mvY', 'CreateTime': '1677811161', 'MsgType': 'text', 'Content': '你好', 'MsgId': '240204787191038'})
包含公众号的id,发起消息的用户id,消息内容以及消息ID。在MessageManager中分别被解析为target、source、content和msg_id,redis_handle是一个以及连接redis数据库的句柄,用来对数据库进行访问。
根绝公众号的id和发起消息的用户id,可以唯一确定它们的历史对话信息,消息ID可以在进行三次重传时确定是否是之前已发送过的请求。通过redis可以记录数据是第几次请求。
代码语言:javascript复制@property
def query_times(self):
msg_query_times = self.redis_handle.get(self.msg_id)
if msg_query_times is None:
self.redis_handle.set(self.msg_id, 2)
return 1
else:
msg_query_times = int(msg_query_times)
self.redis_handle.get(self.msg_id)
self.redis_handle.set(self.msg_id, msg_query_times 1)
return msg_query_times
使用属性query_times记录消息查询次数,每次查询都在redis中将查询次数加一。
程序的关键在于,将二、三次的重试转换成对历史结果的查询,如果三次查询(约15s)还不能获取到结果,只能要求用户重新发送查询信息,只要和上次查询信息相同,我们同样可以取出结果返回给用户,实现越过5s和3次重传的限制。关键的只在第一次请求时调用api和判断请求是否和上次相同(msg_id不同)的代码如下:
代码语言:javascript复制@property
def last_query(self):
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return ''
history_msgs = json.loads(history_msgs)
if len(history_msgs) >= 2:
return history_msgs[-2]['content']
def get_response(self):
query_times = self.query_times
print(f'times{query_times}')
# 只有第一次是实际请求获得回答
if query_times == 1:
# 判断是否和上次请求相同,如果相同则返回上次请求
if self.last_query == self.content and self.redis_last_response:
return self.redis_last_response
response = self.get_chatgpt_response()
self.redis_save_response(response)
return self.redis_response if self.redis_response is not None else ''
# 查询redis里面是否已经有生成的回复信息了,并会重试一次
if query_times == 2:
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
return self.redis_response if self.redis_response is not None else ''
# 如果是第三次请求,就直接告诉用户超时了
if query_times == 3:
time.sleep(3)
return self.redis_response if self.redis_response is not None else
"生成结果时间过长导致公众号未能正常返回,请稍后使用相同的消息获取回复信息。"
return "生成结果时间过长导致公众号未能正常返回,请稍后使用相同的消息获取回复信息。"
最后完整的数据管理代码如下:
代码语言:javascript复制from wechatpy.messages import TextMessage
import openai, os
import redis
import json
from typing import List
import time
class MessageManager:
"""
用于管理接受到信息的回复,微信最多三次15s
"""
def __init__(self, message: TextMessage, redis_handle):
self.source = message.source
self.target = message.target
self.content = message.content
self.msg_id = str(message.id)
self.redis_handle = redis_handle
@property
def redis_response_key(self):
return f"{self.source}_{self.target}_{self.msg_id}"
@property
def redis_history_key(self):
return f"{self.source}_{self.target}"
@property
def last_response_key(self):
return f"{self.source}_{self.target}_last"
@property
def redis_last_response(self):
try:
last_response = self.redis_handle.get(self.last_response_key).decode('utf-8')
except Exception as e:
print(e)
last_response = ''
if last_response is not None:
return last_response
else:
return ''
@property
def query_times(self):
msg_query_times = self.redis_handle.get(self.msg_id)
if msg_query_times is None:
self.redis_handle.set(self.msg_id, 2)
return 1
else:
msg_query_times = int(msg_query_times)
self.redis_handle.get(self.msg_id)
self.redis_handle.set(self.msg_id, msg_query_times 1)
return msg_query_times
@property
def api_query_msgs(self):
"""
历史信息只保留最多十条,由于是连续对话,即用户加chatGPT的共20条消息,消息数量由redis处理处保证
"""
# chatGPT角色信息,一直作为第一条信息
msgs = [{"role": "system", "content": "无所不知的人"}]
msgs = self.user_history_msgs
msgs.append({"role": "user", "content": self.content})
return msgs
def get_chatgpt_response(self):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=self.api_query_msgs
)
replay = response["choices"][0]["message"]["content"]
return replay
def redis_save_response(self, response):
total_msgs = self.api_query_msgs [{"role": "system", "content": response}]
# 保存当前回复的答案
self.redis_handle.set(self.redis_response_key, response)
# 并作为最后一次回复存储
self.redis_handle.set(self.last_response_key, response)
# 保存历史信息
total_msgs_str = json.dumps(total_msgs[-20:])
self.redis_handle.set(self.redis_history_key, total_msgs_str)
def get_response(self):
query_times = self.query_times
print(f'times{query_times}')
# 只有第一次是实际请求获得回答
if query_times == 1:
# 判断是否和上次请求相同,如果相同则返回上次请求
if self.last_query == self.content and self.redis_last_response:
return self.redis_last_response
response = self.get_chatgpt_response()
self.redis_save_response(response)
return self.redis_response if self.redis_response is not None else ''
# 查询redis里面是否已经有生成的回复信息了,并会重试一次
if query_times == 2:
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
if self.redis_response is None:
time.sleep(2)
return self.redis_response if self.redis_response is not None else ''
# 如果是第三次请求,就直接告诉用户超时了
if query_times == 3:
time.sleep(3)
return self.redis_response if self.redis_response is not None else
"生成结果时间过长导致公众号未能正常返回,请稍后使用相同的消息获取回复信息。"
return "生成结果时间过长导致公众号未能正常返回,请稍后使用相同的消息获取回复信息。"
@property
def redis_response(self):
response = self.redis_handle.get(self.redis_response_key)
if response is not None:
return response.decode('utf-8')
return None
@property
def user_history_msgs(self):
if not hasattr(self, '_user_history_msgs'):
self._user_history_msgs = self.get_history_msgs()
return self._user_history_msgs
@property
def last_query(self):
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return ''
history_msgs = json.loads(history_msgs)
if len(history_msgs) >= 2:
return history_msgs[-2]['content']
def get_history_msgs(self) -> List[dict]:
history_msgs = self.redis_handle.get(self.redis_history_key)
if history_msgs is None:
return []
history_msgs = json.loads(history_msgs)
# 如果当前问题和之前问题一样可能是重复回答了,删除之前的回答
if history_msgs[-2]['content'] == self.content:
history_msgs = history_msgs[:-2]
return history_msgs
部署至微信公众号
在有上述数据管理类之后,核心代码即配合wechatpy将接收到的信息处理后返回,由于使用的是django,在配置好访问路径后关键代码如下:
代码语言:javascript复制def response_query(self, request):
msg = parse_message(request.body)
message_manager = MessageManager(msg, self.redis_handle)
chatgpt_res = message_manager.get_response()
# 对于过长的回复微信公众号会提示错误,截断到1024
# 这里我觉得不用解决,客户可以提问时要求chatGPT分段返回
reply = create_reply(chatgpt_res[:1024], message=msg)
response = HttpResponse(reply.render(), content_type='application/xml')
return response
微信公众号完整部署步骤
后续会单独写一篇
后记
chatGPT还存在很多明显的问题,我并不看好当前它的应用,但是它让我看到AI是能真正融入人们生活的,等到公司自研的大模型ready的时候会再次更新。
如果有问题可以留言,上线后会尽力回复