代码语言:javascript复制
# -*- coding: utf-8 -*-
# author : s
import random,string
from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTES,MODIFY_REPLACE,MODIFY_ADD,MODIFY_DELETE
from passlib.hash import ldap_salted_sha1 as ssha
from app.utils import ErrMsg
class LdapOp(object):
"""
Operation Dcouments: http://ldap3.readthedocs.io/
"""
def __init__(self,ip,port,user,passwd):
self._ip = ip
self._port = port
self._user = user
self._passwd = passwd
self.dn = self._user.split(',',1)[1]
self.s = Server(self._ip,self._port,get_info=ALL)
self._conn = Connection(self.s, self._user, self._passwd, auto_bind=True)
@property
def conn(self):
if not self._conn:
print('ldap conn init ')
self._conn = Connection(self.s, self._user, self._passwd, auto_bind=True)
return self._conn
def searchAll(self,name='top'):
entries = self.conn.extend.standard.paged_search(self.dn, '(objectClass=%s)' % name ,attributes=['cn', 'givenName','gidNumber','uidNumber','telephoneNumber','mail'])
en = list(entries)
ulist = [v for v in en if 'People' in v.get('dn','') and 'uid' in v.get('dn','') ]
l = []
for v in ulist:
udict = {}
udict['dn'] = v['dn']
udict['cn'] = v['attributes'].get('cn','')[0] if len(v['attributes'].get('cn','')) > 0 else v['attributes'].get('cn','')
udict['gid'] = str(v['attributes'].get('gidNumber',''))
udict['mail'] = v['attributes'].get('mail','')[0] if len(v['attributes'].get('mail','')) > 0 else v['attributes'].get('mail','')
udict['phone'] = v['attributes'].get('telephoneNumber','')[0] if len(v['attributes'].get('telephoneNumber','')) > 0 else v['attributes'].get('telephoneNumber','')
l.append(udict)
# append user is groups
groups = self.searchGroups()
## 添加 组 到 用户的连接
for user in l:
user['groups'] = []
for group in groups:
# group.get('memberUid',[])
if user.get('cn', '') in group.get('memberUid', []):
user['groups'].append(group.get('cn'))
#print(l)
return l
def getGroups(self):
l = []
return l
def user_addGroup(self, group_dn, user_cn=[]):
data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_ADD, user_cn)]})
return self.conn.entries
def user_deleteGroup(self,group_dn,user_cn=[]):
data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_DELETE,user_cn)]})
print(data)
return self.conn.entries
def addGroup(self,args):
ouname = self.get_ouname('(|(ou=Groups)(ou=Group))')
print('ouname=',ouname)
groupname = args.get('name')
groupid = args.get('groupid')
#grouppassword = args.get('grouppassword')
cndn = 'cn=%s,%s' % (groupname,ouname)
print(cndn)
attr = {
'objectClass': ['posixGroup', 'top'],
'cn' : groupname,
'gidNumber': groupid,
#'userPassword': grouppassword
}
sgroup = [ g for g in self.searchGroups() if g.get('cn','') == groupname or g.get('gidNumber','') == groupid ]
if sgroup:
return {'msg': 'err', 'data': u' 组名字或组ID重复,请检查后重新添加!'}
try:
data = self.conn.add(cndn, attributes=attr)
msg = 'ok'
except Exception as e:
data = e
msg = 'err'
finally:
#self.conn.unbind()
return {'msg': msg , 'data': data}
def searchGroups(self,groupname=''):
if groupname:
#print(self.conn)
rv = self.conn.search('%s' % self.dn, '( &(objectClass=posixGroup)(cn=%s))' %groupname,
attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
else:
rv = self.conn.search('%s' % self.dn, '(objectClass=posixGroup)',
attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
data = []
if rv:
for en in self.conn.entries:
endict = en.entry_get_attributes_dict()
suben = {
'cn': str(en['cn'][0]),
'gidNumber': str(en['gidNumber'][0]),
#'objectClass': ','.join(en['objectClass']),
#'userPassword': en['userPassword'][0],
'dn': str(en.entry_get_dn()),
'memberUid': endict.get('memberUid',[]),
}
data.append(suben)
return data
def deleteGroup(self,name=''):
if not name:
return {'msg':'err','data': 'Group Name not is None'}
group = self.searchGroups(name)
if len(group)== 1:
try:
self.conn.delete(group[0]['dn'])
rv = {'msg':'ok','data':''}
except Exception as e:
rv = {'msg': 'err','data': e}
elif len(group) == 0 :
rv = {'msg': 'err', 'data': 'Not group Find'}
else:
rv = {'msg': 'err', 'data': 'Find %d groups '} %len(group)
return rv
def addUser(self,args):
cndn = 'uid=%s,ou=People,%s' % ( args.get('name'),self.dn)
#object_class = [ 'account','posixAccount', 'top', 'shadowAccount']
if args.get('mail',''):
mail = args.get('mail')
else:
if self.dn == 'dc=test,dc=com':
dn = 'test.com'
mail = '%s@%s' % (args.get('name'), dn)
else:
dn ='.'.join(map(lambda x: x[3:],self.dn.split(',')))
mail = '%s@%s' % (args.get('name'),dn)
attr = {
'objectClass': ['account', 'posixAccount', 'top', 'shadowAccount'],
'userPassword': args.get('passwd'),
'uid': args.get('name'),
'cn': args.get('name'),
'shadowLastChange': '17038',
'shadowMax': '99999',
'shadowWarning': '7',
'uidNumber': args.get('phonenumber'),
'gidNumber': args.get('groupid'),
'homeDirectory': '/home/%s' % args.get('name'),
'mail': mail,
'telephoneNumber': args.get('phonenumber'),
'displayname': args.get('name'),
}
#print('*' *100)
#print(cndn)
#print(attr)
try:
data = self.conn.add(cndn, attributes=attr)
if not data:
attr['objectClass'] = ['posixAccount', 'shadowAccount', 'top', 'person', 'organizationalPerson','inetOrgPerson']
attr['sn'] = args.get('name')
data = self.conn.add(cndn, attributes=attr)
if not data:
msg = self.conn.result['message']
raise ValueError(msg)
msg = 'ok'
except Exception as e:
data = e
msg = 'err'
finally:
self.conn.unbind()
return {'msg': msg , 'data': data}
def deleteUser(self,name=''):
if not name:
return {'msg':'err','data': 'Name not is None'}
if 'dc' in name:
dn = name
else:
dn = 'uid=%s,ou=People,%s' %(name,self.dn)
print(dn)
try:
self.conn.delete(dn)
rv = {'msg':'ok','data':''}
except Exception as e:
rv = {'msg': 'err','data': e}
finally:
self.conn.unbind()
return rv
def searchUser(self,name):
try:
users = self.searchAll()
if name:
data = [ user for user in users if name in user.get('cn') ]
else:
data = users
rv = 'ok'
except Exception as e:
data = e
rv = 'err'
finally:
return {'msg':rv,'data':data}
def searchSinUser(self,name):
try:
ga = self.conn.extend.standard.paged_search(search_base=self.dn,
search_filter='(uid=%s)' %name,
search_scope= SUBTREE,
attributes=ALL_ATTRIBUTES)
user = list(ga)
if user:
data = user[0].get('attributes')
d = {
'user': str(data.get('cn')[0]),
'passwd': data.get('userPassword')[0].decode('utf-8'),
'gid': data.get('gidNumber'),
'mail': data.get('mail')[0],
'phonenumber': data.get('uidNumber')
}
else:
d = {}
users = self.searchAll()
usingle = [d for d in users if d.get('cn') == name ]
u = {}
if usingle:
u = usingle[0]
except Exception as e:
print(e)
u = {'msg': ErrMsg()}
finally:
#print('user=',u)
x = u.update(d)
print('u=',u)
return u
def editUser(self,args):
try:
cn = 'uid=%s,ou=People,%s' % (args.get('user'), self.dn)
print(cn)
passwd = args.get('passwd','')
user = self.searchSinUser(args.get('user'))
data = ''
if user.get('passwd') != passwd:
if 'SSHA' not in passwd:
passwd = self.encodePasswd(passwd)
data = self.conn.modify(cn, {'userPassword': (MODIFY_REPLACE,passwd)})
if user.get('gid') != args.get('gid') and args.get('gid') != 'default':
data = self.conn.modify(cn,{'gidNumber': (MODIFY_REPLACE,args.get('gid')) })
except Exception as e:
ErrMsg()
data = e
finally:
print('data', data)
return data
def setPasswd(self,**kwargs):
try:
cn = 'uid=%s,ou=People,%s' % (kwargs.get('user',''), self.dn)
data = self.conn.modify(cn,{'userPassword':(MODIFY_REPLACE,kwargs.get('passwd', ''))})
except Exception as e:
data = ErrMsg()
finally:
return data
@staticmethod
def GenPasswd():
return ''.join(random.sample(string.ascii_letters string.digits, 8))
def _searchUser_org(self,username):
print('in-search-org')
ga = self.conn.extend.standard.paged_search(search_base=self.dn,
search_filter='(uid=%s)' %username,
search_scope= SUBTREE,
attributes=ALL_ATTRIBUTES)
user = list(ga)
print(user)
entry = self.conn.response[0]
dn = entry['dn']
attr_dict = entry['attributes']
return (dn,attr_dict)
def authUser(self,username,password):
try:
cn = 'uid=%s,ou=People,%s' % (username,self.dn)
conn2 = Connection(self._ip, user=cn, password=password,
check_names=True, lazy=False, raise_exceptions=False)
conn2.bind()
if conn2.result["description"] == "success":
rv = 'ok'
data = '认证成功'
else:
rv = 'err'
data = '认证失败,用户名或密码错误!'
except Exception as e:
rv = 'err'
data = ErrMsg()
finally:
return {'rv': rv,'data':data}
@staticmethod
def encodePasswd(password=''):
"""
:param password:
:return:
"""
if password:
return ssha.encrypt(password,salt_size=12)
return ''
def get_ouname(self,ouname):
"""
ouname = '(|(ou=Groups)(ou=Group))'
:param ouname:
:return:
"""
print(self.dn,ouname)
ou = self.conn.search('%s' % self.dn, '%s' % ouname )
print(ou)
retry = self.conn.entries
num = len(retry)
if num == 0:
return []
else:
return retry[0].entry_get_dn()