本文分为三部分,
第一部分主要内容:滴答清单的token获取,数据获取,通过网页API获取数据并分析,滴答清单的任务增删改查。
第二部分主要内容:zoho CRM token获取,数据获取,数据增删改查
第三部分主要内容:API建立,fastAPI的基本使用和注意事项
简单逻辑图,主要是对数据库的增删改查,来保证双方的信息的同步一致性。
本文将以Python作为开发语言,用到的Python的库如下,请确认自行安装库,
代码语言:text复制FastAPI
BaseModel
datetime
requests
pymssql
uvicorn
Python库说明:
FastAPI:创建API使用,在CRM端,用户创建或者新建信息资料时,呼叫API,并将资料写入滴答清单
BaseModel:定义API JSON 模型,如定义该API JSON有那些字段信息,
datetime:处理数据过程中的时间日期问题
requests:请求库,在程序中主要使用的库,用于API请求和发送。
pymssql:数据库连接库
uvicorn:API启动监控
滴答清单篇
滴答端API获取token
请按照以下网址要求,生成API令牌:
https://developer.dida365.com/docs#/openapi
大家可能在请求的过程中有疑问,我这边请求过程整理成文档,可以进行参考
地址:https://cloud.tencent.com/developer/article/2368106
因为滴答官方API功能有些欠缺,没有指派担当的功能,,故在此基础上,通过网页端获取新增任务的接口,用来指派担当,
滴答代码编写
准备工作:
请严格按照如下操作进行
创建一个名为dida的文件,内容如下:
第一行是你的滴答清单的cookie
第二行是你的滴答清单的令牌
第三行如果你用的是国际版请进行变更,如果是大陆的就不要做变更
代码语言:text复制cookie地址
令牌
api.dida365.com
以上是对相关信息做持久化
创建一个dida.py
代码语言:python代码运行次数:0复制import requests
class di():
def __init__(self):
self.cookie = self.readcookie()
self.token = self.readtoken()
self.host = self.readhost()
self.webhead = self.webhead
self.datajson = None
self.apiurl = 'https://api.dida365.com/api/v2/batch/task'
以上文件创建了一个di类,
并设定6个成员变量,
以上信息分别对成员变量进行赋值操作,cookie,token,host分别定义三个函数读取之前我们创建的文档的内容并赋值给成员变量。
webhead:是网页的请求头,请求头需要有Cookie和User-Agent内容,故单独进行赋值
datajson:用来存放我们获取到滴答的所有信息值,方便后面进行提取数据
apiurl:为网页版请求地址
以下是对我们之前创建的文档并赋值的函数,做参考
代码语言:Python复制 def webhead(self):
header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Cookie": f"{self.cookie}"
}
return header
def readcookie(self):
with open("dida", "r") as file:
f = file.readlines()
file.close()
cook = f[0].replace("n", "")
return cook
def readtoken(self):
with open("dida", "r") as file:
f = file.readlines()
file.close()
ken = f[1].replace("n", "")
return ken
def readhost(self):
with open("dida", "r") as file:
f = file.readlines()
file.close()
return f[2]
分析滴答API文档,写了一个关于滴答请求的函数,可以对滴答清单的任务做获取,删除,注意:如果是post 的请求,后面需要跟一个JSON参数,也就是我们需要更新或者新增的任务信息
代码语言:python代码运行次数:0复制 def apididarequest(self,type:str,endurl:str,**kwargs):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36",
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
if type == "get" or type == "GET":
url = f"https://{self.host}{endurl}"
r = requests.get(url=url,headers=headers)
return r.json()
elif type == "post" or type == "POST":
if "JSON" in kwargs:
url = f"https://{self.host}{endurl}"
r = requests.post(url=url,headers=headers,json=kwargs['JSON'])
print(r.status_code)
return r.json()
else:
url = f"https://{self.host}{endurl}"
r = requests.post(url=url, headers=headers)
print(r.status_code)
return r.status_code
elif type == "DELETE" or type == "delete":
url = f"https://{self.host}{endurl}"
r = requests.delete(url=url, headers=headers)
return r
输入两个参数,第一个是类型,判断是get还是post请求,第二个是请求的Url地址,注意url,地址不包括host。
获取所有数据
代码语言:python代码运行次数:0复制 def webapi(self,projectId):
url = f"https://api.dida365.com/api/v2/project/{projectId}/tasks"
r = requests.get(url=url,headers=self.webhead())
self.datajson = r.json()
return self
以上的函数带入滴答的项目ID,请按照自己的项目ID进行参数填写,注意url后面的地址,projectId的位置是网页端你自己的projectId,复制即可,
以上执行完成后会将信息赋值给datajson,并返回self,方便链式调用
一下是我按照我需要的字段做了整理,数据以列表的方式进行存储,主要是按照status的数值来判断关闭和没有关闭的任务
代码语言:python代码运行次数:0复制 # 获取清单中未关闭的任务
def ownertaskinfo(self):
Allinfo = list()
datajson = self.datajson
for data in datajson:
status = data.get("status")
iid = data.get("id")
title = data.get("title")
content = data.get("content")
modifiedtime = data.get("modifiedTime")
creator = data.get("creator")
assignee = data.get("assignee")
parentId = data.get("parentId")
if assignee is None:
assignee = creator
if status != 2:
info = []
info.append(iid)
info.append(title)
info.append(content)
info.append(modifiedtime)
info.append(creator)
info.append(assignee)
info.append(parentId)
info.append(status)
Allinfo.append(info)
return Allinfo
代码语言:python代码运行次数:0复制if __name__ == "__main__":
d = di()
task = d.webapi("dd6d985e9c235611b74d2430").ownertaskinfo()
for i in task:
print(i)
以上对打印出所有的位于dd6d985e9c235611b74d2430的所有任务并迭代打印出来
以上对滴答清单做数据演示获取,
请按照自己需要的数据自行定义。
以上是获取滴答的数据的整个过程
ZohoCRM篇
与滴答清单一样,我们需要对主要信息做持久化
建立两个文件,第一个文件是refresh.txt,主要内容
说明:请按官网地址:https://www.zoho.com.cn/crm/help/developer/api/get-records.html
操作获取client_id,client_secret,refresh_token
代码语言:text复制client_id=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
client_secret=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
refresh_token=xxxxxxxxxxxxxxxxxxxxxxxxxxx
第二个文件名称为access_token_file,
获取完refresh_token后,我们根据操作获取刷新令牌,有效期是1个小时,并将刷新令牌的信息和时间写入文件,时间主要是用来判断刷新令牌是否有效
代码语言:text复制xxxxxxxxxxxxxxxxxxxxx=2023-12-04 15:16:37
创建一个zohotoken.py文件,创建一个zoho类,并对主要信息做判断并赋值
代码非常简单:
主要是读取文档中的信息,并将字符串给成员变量,需要注意的是access_token主要是对时间进行判断是否在1个小时之内,如果不在就重新获取并将最新的信息写入文件
全部代码如下:
代码语言:python代码运行次数:0复制import requests
import datetime
class zoho():
def __init__(self):
self.client_id = self.client()
self.client_secret = self.secret()
self.refresh_token = self.token()
def client(self):
with open("refresh.txt","r") as clientas :
clienta = clientas.readlines()
client_id = clienta[0].split("=")
return client_id[1].strip()
def secret(self):
with open("refresh.txt","r") as secretfile:
secretline = secretfile.readlines()
client_secret = secretline[1].split("=")
return client_secret[1].strip()
def token(self):
with open("refresh.txt","r") as t:
ts = t.readlines()
refresh_token = ts[2].split("=")
return refresh_token[1].strip()
def wirte_access_token(self):
client_id = self.client_id
client_secret = self.client_secret
refresh_token = self.refresh_token
# 构建请求
url = "https://accounts.zoho.com.cn/oauth/v2/token"
payload = {
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token
}
# 发送请求
response = requests.post(url, data=payload)
# 解析响应
if response.status_code == 200:
access_token = response.json()["access_token"]
with open("access_token_file", "w") as f:
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(str(access_token) "=" str(current_time))
f.close()
return access_token
def access_token(self):
with open("access_token_file","r") as f:
access = f.readline().split("=")
# 文件中的日期
olddatetime = access[1]
# 对文件中的日期进行类型转换
olddatetime_date = datetime.datetime.strptime(olddatetime,"%Y-%m-%d %H:%M:%S")
# 现在的时间日期
now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 转换现在日期的时间格式
now_time_datatime = datetime.datetime.strptime(now_time,"%Y-%m-%d %H:%M:%S")
# 给文件中的日期加上3600秒,因为access_token的有效期是3600,以此时间来判断
olddatetime_three_thousand = olddatetime_date datetime.timedelta(seconds=3600)
if olddatetime_three_thousand > now_time_datatime:
f.close()
return access[0]
else:
f.close()
return self.wirte_access_token()
获取zoho CRM信息,
代码如下:
代码语言:python代码运行次数:0复制from zohotoken import zoho
import requests
class zohoselect(zoho):
# 获取模块的所有信息
def SelectModules(self,modules):
url = f"https://www.zohoapis.com.cn/crm/v2/{modules}"
headers = {
"Authorization": f"Zoho-oauthtoken {zoho.access_token(self)}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
records = response.json()
return records["data"]
# 获取指定模块下的视图信息
def SelectModulesCvid(self,modules,cvid:int):
url = f"https://www.zohoapis.com.cn/crm/v2/{modules}?&cvid={cvid}"
headers = {
"Authorization": f"Zoho-oauthtoken {zoho.access_token(self)}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
records = response.json()
return records["data"]
# 获取模块视图下的资料,超过200条使用
def SelectModulesCvid_Page(self,modules,cvid:int,page:int,per_page:int):
url = f"https://www.zohoapis.com.cn/crm/v2/{modules}?&cvid={cvid}&page={page}&per_page={per_page}"
headers = {
"Authorization": f"Zoho-oauthtoken {zoho.access_token(self)}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
records = response.json()
return records["data"]
# 获取某一条资料信息
def related_one(self,modules,record_id:str):
url = f"https://www.zohoapis.com.cn/crm/v2/{modules}/{record_id}"
headers = {
"Authorization": f"Zoho-oauthtoken {zoho.access_token(self)}",
"Content-Type": "application/json"
}
response = requests.get(url, headers=headers)
records = response.json()
return records["data"]
以上代码是根据官方提供的API文档写的代码,以有给出实例,按照我这个方式进行,只需要按照官方提供的API文档替换相应的URL,和参数即可。
调用:
代码语言:python代码运行次数:0复制if __name__ == "__main__":
z = zohoselect()
Dep = z.related_one("Even","18422300000712345")
for i in Dep:
print(i)
以上代码用来获取Even模块的下的id为18422300000712345资料并迭代出所有内容
数据库操作
请按照自己需要的字段定义数据库字段,
以下是连接获取sql server的代码示例
代码语言:python代码运行次数:0复制import pymssql
# 设置连接参数
server = '192.168.1.91'
database = 'dida'
username = 'dida'
password = 'K5PkYSn'
mydb = pymssql.connect(server, username, password, database)
def selectsql():
mycursor = mydb.cursor()
mycursor.execute("SELECT * FROM crmtodida")
result = mycursor.fetchall()
return result
def into(crmname,crmid,crmregion,crmsource,didaid,didataskid,Owner,columnId):
mycursor = mydb.cursor()
sql = "INSERT INTO [crmtodida] (crmname,crmid,crmregion,crmsource,didaid,didataskid,Owner,columnId) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
val = (crmname,crmid,crmregion,crmsource,didaid,didataskid,Owner,columnId)
mycursor.execute(sql, val)
mydb.commit()
mycursor.close()
return mycursor.rowcount
以上知识对数据库操作进行演示,增删改查请以自己的需求为准。
请自行更换相应数据库信息
server、database、username、password
API建立
准备工作:
1.域名
2.SSL证书文件key和pem文件
创建一个main文件
内容如下:
代码语言:python代码运行次数:0复制from fastapi import FastAPI
from pydantic import BaseModel
d = di()
app = FastAPI()
class Project_Management(BaseModel):
iid :str
title :str
dep :str
Owner :str
st :str
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/Project_Management/")
async def Project_Management(project_Management:Project_Management):
projectid = "656fd3c9e4b0219f4"
iid = project_Management.iid
title = project_Management.title
dep = project_Management.dep
Owner = project_Management.Owner
st = project_Management.st
Project = selectyid(Dealsid=iid)
owneremail = selectuser(useremail=Owner)
print(iid, title, dep,Owner,st)
if "台中業務課" in dep:
if Project == None:
taskid = createtask(title, projectid).get("id")
d.addtask(owneremail, "", taskid, projectid, title, "")
into(title, iid, "", "共有专案", projectid, taskid, "xxx@mxxx.com.tw", "0")
else:
if "結案歸檔" in st:
d.taskclose(projectid,Project)
dele(Project)
else:
d.addtask(owneremail, "", Project, projectid, title, "")
return {"Project_Management_data": title}
代码说明:
class Project_Management(BaseModel)定义了一个请求的json模型
@app.get("/")和@app.post("/Project_Management/")为装饰器,定义了这个请求的类型和路径app.类型,以上分别创建了一个get和post的请求,
在使用post请求时,需要用到json模型,定义用来接受时的参数信息,
在相应请求下面写出自己的逻辑
启用API:
打开cmd切换到相应目录,输入如下内容:
main:app 为API文件名
--host:设置为0.0.0.0,允许外部任何地址访问
--port 定义端口号
--sslkeyfile和--ssl-certfile:分别是SSL证书文件,用来加密请求
代码语言:text复制uvicorn main:app --host 0.0.0.0 --port 8000 --ssl-keyfile="server.key" --ssl-certfile="server.pem" --reload
至此同步过程搭建完毕,请按照自己实际情况进行完善,如,同步时间,CRM端呼叫带入的参数,带入的参数要与定义的模型一直。
我正在参与2023腾讯技术创作特训营第四期有奖征文,快来和我瓜分大奖!