通过 python 获取钉钉后台人员

2022-06-14 17:43:34 浏览数 (1)

通过钉钉官方接口可定时同步得知企业离职人员便于认证管理同步,大大简便了人员账号的管理。

请求带有参数的 API 接口

注意请求接口白名单情况 可参考钉钉开发文档: 创建应用:

代码示例:

代码语言:javascript复制
import requests
import json
import sys
import os
corpId="35c2365db2f1378bfd74f96b7c7a529f"
corpSecret="13dc3517caa3da9bee5e7b6b2ead8938"

headers = {'Content-Type': 'application/json;charset=utf-8'}
api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(corpId,corpSecret)

def get_token():
    try:
        res = requests.get(api_url,headers=headers)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token
    except:
        print('请求失败')

if __name__ == "__main__":
    get_token()

人名转拼音模块,暂不添加

代码语言:javascript复制
import pypinyin
str = '崔建哲,中国,狸米,大傻,杨和苏'
pinyin_name = ''.join(pypinyin.lazy_pinyin(str))
print(pinyin_name)

分别请求部门详情、部门成员详情、角色详情,进行数据清洗,并且同步人员信息发送邮件

代码示例:

代码语言:javascript复制
#!/bin/python3  env
###################################################################
#  This Script is used to sync global user from Dingding to Email #
#  Date: 2019-9                                                   #
#  Author: cuijianzhe                                             #
#  Email: 598941324@qq.com                                        #
###################################################################
import requests
import json
import pypinyin
import time
import os
import xlsxwriter
from functools import wraps
filePath = '/scripts/limi/'
workbook = xlsxwriter.Workbook(filePath   'user_list.xlsx') #创建一个ExceL 文档
worksheet = workbook.add_worksheet()  #创建一个工作对象
worksheet.set_column('A:E',20) #设定第一列Adap E40像素
#bold = workbook.add_format({'bold':True}) #加粗
bold = workbook.add_format({
        'bold':  True,  # 字体加粗
        'border': 1,  # 单元格边框宽度
        'align': 'center',  # 水平对齐方式
        'valign': 'vcenter',  # 垂直对齐方式
        'fg_color': '#F4B084',  # 单元格背景颜色
        'text_wrap': True,  # 是否自动换行
    })
bold1 = workbook.add_format({'align': 'center'})
heading = ['姓名','Userid','手机号','职位']  #设置表头
worksheet.write_row('A1',heading,bold)
DING_URL = "https://oapi.dingtalk.com"
depart_info = {}
_username = []
_usermobile = []
_userid = []
_userposition = []
def func_timer(function):
    '''
    用装饰器实现函数计时
    :param function: 需要计时的函数
    :return: None
    '''
    @wraps(function)
    def function_timer(*args, **kwargs):
        print('[Function: 开始生成钉钉用户和所属部门]')
        t0 = time.time()
        result = function(*args, **kwargs)
        t1 = time.time()
        print('[Function: 同步完成, 耗时: {time:.2f}s]'.format(time=t1 - t0))
        return result
    return function_timer

class dingUserSync():
    def __init__(self):
        # 设置appkey和appsecret为隐藏属性
        self.__DING_USER = 'dingding'
        self.__DING_SECRET = 'dingsecret'
        self.headers = {'Content-Type': 'application/json;charset=utf-8'}
        self.token = ''
    def get_token(self):
        url = DING_URL   '/gettoken'
        param_dict = {
            'appkey': self.__DING_USER,
            'appsecret': self.__DING_SECRET,
        }

        ding_res = requests.get(url,param_dict,headers=self.headers).json()
        if ding_res.get('errcode') == 0:
            return ding_res.get('access_token')
        else:
            raise ding_res.get('errmsg')
    def get_depart_info(self):
        '''
        获取部门全部信息
        例:{'createDeptGroup': True, 'name': '何层组', 'id': 381056019, 'autoAddUser': True, 'parentid': 127811237}
        :return:
        '''
        if not self.token:
            self.token = self.get_token()
        url = DING_URL   '/department/list'
        ding_ret = requests.get(url, {'access_token': self.token}).json()
        if ding_ret.get('errcode') == 0:
            return ding_ret.get('department')
        else:
            raise '错误信息为:%s,尝试刷新token信息' % ding_ret.get('errmsg')
    @staticmethod
    def pinyin_transfer(name=None):
        pinyin_name = ''.join(pypinyin.lazy_pinyin(name))
        return pinyin_name
    def get_depart_user(self, depart_id):
        if not self.token:
            self.token = self.get_token()
        param_dict = {
            'access_token': self.token,
            'department_id': depart_id
        }
        url = DING_URL   '/user/simplelist'
        depart_user_res = json.loads(requests.get(url, param_dict).text)
        if depart_user_res.get('errcode') == 0:
            #返回值实例:[{'name': '宫琳', 'userid': '054940691023503691'}, {'name': '张浩', 'userid': '084637391724299533'}]
            return depart_user_res.get('userlist')

    def get_user_info(self):
        if not self.token:
            self.token = self.get_token()
        url = DING_URL   '/user/get?'
        for depart in self.get_depart_info():
            depart_id,depart_name = depart.get('id'),depart.get('name')
            depart_info[depart_name] = depart_id

        for info in depart_info:
            user_info = self.get_depart_user(depart_info[info])
            for user in user_info:
                _username.append(user.get('name'))
                _userid.append(user.get('userid'))

        for userid in _userid:
            '''
            将_name_list转换成str,保存到本地
            '''
            param_dict = {
                'access_token': self.token,
                'userid': userid
            }
            userinfo_res = json.loads(requests.get(url, param_dict).text)
            _userposition.append(userinfo_res.get('position'))
            _usermobile.append(userinfo_res.get('mobile'))

def getNameInfo():
    dingUserSync().get_user_info()
    worksheet.write_column('A2', _username, bold1)
    worksheet.write_column('B2', _userid, bold1)
    worksheet.write_column('C2', _usermobile, bold1)
    worksheet.write_column('D2', _userposition, bold1)
    workbook.close()
    if "userOld" in os.listdir(filePath):
        with open(filePath   'userOld','r',encoding='utf-8') as oldname:
            oldListName = oldname.read().split(',')[:-1]
        addName = [nameValue for nameValue in _username if nameValue not in oldListName]  # 入职人员
        leaveName = [nameValue for nameValue in oldListName if nameValue not in _username]  #离职人员
        # 更新本地人员信息
        with open(filePath   'userOld', 'w', encoding='utf-8') as new_name:
            for newinfo in _username:
                new_name.write(str(newinfo   ','))
        return addName,leaveName,oldListName
    else:
        with open(filePath   'userOld', 'w', encoding='utf-8') as new_name:
            for newinfo in _username:
                new_name.write(str(newinfo   ','))

@func_timer
def _email():
    import smtplib
    from email.mime.text import MIMEText
    from email.utils import formataddr
    from email.mime.multipart import MIMEMultipart
    date = time.strftime('%Y-%m-%d',time.localtime())
    my_sender = 'noreply@163.com'
    my_pass = 'mypassword'
    my_user = ['noreply@qq.com',]
    UserList = getNameInfo()
    body = """
    日期:%s,昨日入职%s人,名单:(%s),离职%s个,名单:(%s),昨天公司总人数%s个,现在公司总人数%s个.
    """%(date,len(UserList[0]),','.join(UserList[0]),len(UserList[1]),','.join(UserList[1]),len(UserList[2]),len(_username))
    msg = MIMEMultipart()   #构造附件
    fujian = MIMEText(open(filePath   'user_list.xlsx','rb').read(),'base64','utf-8')
    fujian['Content-Type'] = 'application/octet-stream'
    fujian["Content-Disposition"] = 'attachment; filename="user_list.xlsx"'
    msg.attach(MIMEText(body, 'plain', 'utf-8'))
    msg.attach(fujian)
    msg['From'] = formataddr(["limiUserRsync", my_sender])
    msg['To'] = ','.join(my_user)
    msg['Subject'] = '人员同步'
    server = smtplib.SMTP_SSL("smtp.163.com", 465)
    server.login(my_sender, my_pass)
    server.sendmail(my_sender, msg['To'].split(','), msg.as_string())
    server.quit()

if __name__ == "__main__":
    _email()

同步一些工具可行可改可适配可认证……

查看邮件信息(附件构造为 Excel 或者 txt 文本文件):

  • 表格附件构造参考 xlsxwriter 模块

标题:通过 python 获取钉钉后台人员

作者:cuijianzhe

地址:https://cloud.tencent.com/developer/article/2022762

0 人点赞