(adsbygoogle = window.adsbygoogle || []).push({});
起因
因为之前白嫖了一个E5的5T想着不做网盘真的是白瞎了,但是由于oneindex年久失修,最后选择了SpencerWoo大佬的onedrive-vercel-index,但是由于是托管在vercel上面的,没办法像oneindex那样上传文件,就导致我很烦,于是便综合起来网上的教程和微软的Azure Active Directory文档写了一个基于onedrive for business的小例子。
开发流程
- 向 Azure Active Directory (AAD) 注册的客户端 ID 和密钥(客户端密码)
- 从 OAuth 2 授权代码流收到的授权代码
- OneDrive for Business API 终结点 URL
- OneDrive for Business 资源的访问令牌
- 在当前令牌到期时生成其他访问令牌的刷新令牌。
开始对接
客户端id和密钥使用过oneindex的小伙伴应该都知道怎么设置,这里就不多介绍,除此之外设置一个回调的uri就可以了,我使用的是“localhost:8400”
获取access_token
onedrive for business使用的是标准的Oauth2流程,所以大概流程就是先获取code,使用code交换access_token,然后就可以调用api了,这里先贴出获取code以及交换access_token的代码:
代码语言:javascript复制def get_token(self, url):
code = parse_qs(urlparse(url).query).get('code')[0]
data = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'client_secret': self.client_secret,
'code': code,
'grant_type': 'authorization_code',
'resource': self.resource_uri
}
resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
return self.save_token(resp)
def refresh_token(self):
token = self.read_token(only_read=True)
data = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'client_secret': self.client_secret,
'refresh_token': token['refresh_token'],
'grant_type': 'refresh_token',
'resource': 'https://graph.microsoft.com'
}
resp = requests.post(self.oauth2_uri, headers=self.header, data=data).json()
return self.save_token(resp)
onedrive上传文件
获取到了token之后,就可以去调用onedrive for business相关的代码了,由于大于4MB的文件需要创建会话去分片上传,所以这里我写了两个上传方法,大概代码如下:
代码语言:javascript复制def get_path(self, path, op):
if path[0] == '/': path = path[1:]
if path[-1] == '/': path = path[:-1]
if op[0] == '/': op = op[1:]
return self.onedrive_uri '/root:/backup/{}:/{}'.format(path, op)
def create_folder(self, path):
path = list(filter(None, path.split('/')))
pa = '/'.join(path[:len(path) - 1])
name = path[len(path) - 1]
data = json.dumps({
"name": name,
"folder": {},
})
r = requests.post(self.get_path(pa, 'children'), headers=self.header, data=data)
return r.status_code
def upload_url(self, path, conflict="fail"):
r = requests.post(self.get_path(
path, 'createUploadSession'
), headers=self.header)
if r.status_code == 200:
return r.json()['uploadUrl']
else:
return ""
def upload_file(self, path, data):
size = len(data)
if size > 4000000:
return self.upload_big_file(path, data)
else:
r = requests.put(self.get_path(path, 'content'), headers=self.header, data=data)
if 'error' in r:
return "上传失败"
return "上传成功"
def upload_big_file(self, path, data):
url = self.upload_url(path)
if url == "":
return "上传取消"
size = len(data)
chunk_size = 3276800
file_name = path.split('/')[len(path.split('/')) - 1]
pbar = tqdm(total=size, leave=False, unit='B', unit_scale=True, desc=file_name)
for i in range(0, size, chunk_size):
chunk_data = data[i:i chunk_size]
pbar.update(len(chunk_data))
r = requests.put(url, headers={
'Content-Length': str(len(chunk_data)),
'Content-Range': 'bytes {}-{}/{}'.format(i, i len(chunk_data) - 1, size)
}, data=chunk_data)
if r.status_code not in [200, 201, 202]:
print("上传出错")
break
遇到的几个坑
- 终结点是
https://graph.microsoft.com
- onedrive的请求api是
https://graph.microsoft.com/v1.0/me/drive
,但是文档中以及网上教程写的是https://graph.microsoft.com/me/drive
,这也是我认为比较坑的一点 - secret需要复制“值”,而不是“机密ID”
完整代码
此处内容需要评论回复后方可阅读
使用实例
1.配置self.client_id
以及self.client_secret
2.在其他的文件中引入one
,也可以引入onedrive自己实例化
from onedrive import one
3.上传文件
代码语言:javascript复制if __name__ == '__main__':
# 上传至onedirve的路径
remote = '/uploads/images/logo.png'
# 本地文件路径
file = os.getcwd() '/images/logo.png'
with open(file, 'rb') as f:
# 小文件会打印“上传成功”,大文件会显示上传进度条
print(one.upload_file(remote, f.read()))
如无特殊说明《onedrive for business使用python上传文件》为博主MoLeft原创,转载请注明原文链接为:https://moleft.cn/post-276.html