python 操作 openldap 基本操作

# -*- coding: utf-8 -*-
# author : s

import random,string
from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTES,MODIFY_REPLACE,MODIFY_ADD,MODIFY_DELETE
from passlib.hash import  ldap_salted_sha1 as ssha
from app.utils import  ErrMsg

class LdapOp(object):

"""
    Operation Dcouments: http://ldap3.readthedocs.io/

"""

def __init__(self,ip,port,user,passwd):
        self._ip = ip
        self._port = port
        self._user = user
        self._passwd = passwd
        self.dn = self._user.split(',',1)[1]
        self.s = Server(self._ip,self._port,get_info=ALL)
        self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

@property
    def conn(self):

if not self._conn:
            print('ldap conn init ')
            self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

return self._conn

def searchAll(self,name='top'):
        entries = self.conn.extend.standard.paged_search(self.dn, '(objectClass=%s)' % name ,attributes=['cn', 'givenName','gidNumber','uidNumber','telephoneNumber','mail'])
        en = list(entries)

ulist = [v for v in en if  'People' in v.get('dn','')  and  'uid' in v.get('dn','') ]

l = []
        for v in ulist:
            udict = {}
            udict['dn'] = v['dn']
            udict['cn'] =  v['attributes'].get('cn','')[0]  if len(v['attributes'].get('cn','')) > 0 else v['attributes'].get('cn','')
            udict['gid'] = str(v['attributes'].get('gidNumber',''))
            udict['mail'] = v['attributes'].get('mail','')[0]  if len(v['attributes'].get('mail','')) > 0 else v['attributes'].get('mail','')
            udict['phone'] = v['attributes'].get('telephoneNumber','')[0]  if len(v['attributes'].get('telephoneNumber','')) > 0 else v['attributes'].get('telephoneNumber','')
            l.append(udict)

# append user is groups
        groups = self.searchGroups()
        ##  添加 组 到 用户的连接

for user in l:
            user['groups'] = []
            for group in groups:
                # group.get('memberUid',[])
                if user.get('cn', '') in group.get('memberUid', []):
                    user['groups'].append(group.get('cn'))
        #print(l)
        return l

def getGroups(self):
        l = []

return l

def user_addGroup(self, group_dn, user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_ADD, user_cn)]})
        return self.conn.entries

def user_deleteGroup(self,group_dn,user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_DELETE,user_cn)]})
        print(data)
        return self.conn.entries

def addGroup(self,args):

ouname = self.get_ouname('(|(ou=Groups)(ou=Group))')
        print('ouname=',ouname)
        groupname = args.get('name')
        groupid = args.get('groupid')
        #grouppassword = args.get('grouppassword')

cndn = 'cn=%s,%s' % (groupname,ouname)
        print(cndn)
        attr = {
            'objectClass': ['posixGroup', 'top'],
            'cn' : groupname,
            'gidNumber': groupid,
            #'userPassword': grouppassword
        }
        sgroup = [ g for g in self.searchGroups() if g.get('cn','') == groupname or g.get('gidNumber','') == groupid ]
        if sgroup:
            return {'msg': 'err', 'data': u' 组名字或组ID重复,请检查后重新添加!'}
        try:
            data = self.conn.add(cndn, attributes=attr)
            msg = 'ok'
        except Exception as e:

data  = e
            msg = 'err'
        finally:
            #self.conn.unbind()
            return {'msg': msg , 'data': data}

def searchGroups(self,groupname=''):

if groupname:
            #print(self.conn)
            rv = self.conn.search('%s' % self.dn, '( &(objectClass=posixGroup)(cn=%s))' %groupname,
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        else:
            rv = self.conn.search('%s' % self.dn, '(objectClass=posixGroup)',
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        data = []

if rv:
            for en in  self.conn.entries:
                endict = en.entry_get_attributes_dict()
                suben = {
                    'cn': str(en['cn'][0]),
                    'gidNumber': str(en['gidNumber'][0]),
                    #'objectClass': ','.join(en['objectClass']),
                    #'userPassword': en['userPassword'][0],
                    'dn': str(en.entry_get_dn()),
                    'memberUid': endict.get('memberUid',[]),
                }
                data.append(suben)

return data

def deleteGroup(self,name=''):

if not name:
            return {'msg':'err','data': 'Group Name not is None'}

group = self.searchGroups(name)

if len(group)== 1:

try:

self.conn.delete(group[0]['dn'])

rv =  {'msg':'ok','data':''}
            except Exception as e:
                rv =  {'msg': 'err','data': e}

elif len(group) == 0 :
            rv = {'msg': 'err', 'data': 'Not group Find'}

else:
            rv = {'msg': 'err', 'data': 'Find %d groups '} %len(group)

return rv

def addUser(self,args):

cndn = 'uid=%s,ou=People,%s' % ( args.get('name'),self.dn)
        #object_class = [ 'account','posixAccount', 'top', 'shadowAccount']

if args.get('mail',''):
            mail = args.get('mail')
        else:
            if self.dn == 'dc=test,dc=com':
                dn = 'test.com'
                mail = '%[email protected]%s' % (args.get('name'), dn)
            else:
                dn ='.'.join(map(lambda x: x[3:],self.dn.split(',')))
                mail = '%[email protected]%s' % (args.get('name'),dn)
        attr = {
            'objectClass': ['account', 'posixAccount', 'top', 'shadowAccount'],
            'userPassword': args.get('passwd'),
            'uid': args.get('name'),
            'cn': args.get('name'),
            'shadowLastChange': '17038',
            'shadowMax': '99999',
            'shadowWarning': '7',
            'uidNumber': args.get('phonenumber'),
            'gidNumber': args.get('groupid'),
            'homeDirectory': '/home/%s' % args.get('name'),
            'mail': mail,
            'telephoneNumber': args.get('phonenumber'),
            'displayname': args.get('name'),
        }
        #print('*' *100)
        #print(cndn)
        #print(attr)
        try:
            data = self.conn.add(cndn, attributes=attr)
            if not data:
                
                attr['objectClass'] = ['posixAccount', 'shadowAccount', 'top', 'person', 'organizationalPerson','inetOrgPerson']
                attr['sn'] = args.get('name')
          
                data = self.conn.add(cndn, attributes=attr)
                if not data:
                    
                    msg = self.conn.result['message']
                    raise ValueError(msg)
            msg = 'ok'
        except Exception as e:

data  = e
            msg = 'err'
        finally:
            self.conn.unbind()
        return {'msg': msg , 'data': data}

def deleteUser(self,name=''):
        if not name:
            return {'msg':'err','data': 'Name not is None'}
        if 'dc' in name:
            dn = name
        else:
            dn = 'uid=%s,ou=People,%s' %(name,self.dn)
        print(dn)
        try:
            self.conn.delete(dn)
            rv =  {'msg':'ok','data':''}
        except Exception as e:
            rv =  {'msg': 'err','data': e}
        finally:
            self.conn.unbind()
        return  rv

def searchUser(self,name):
        try:
            users = self.searchAll()
            if name:
                data = [ user for user in  users if name in  user.get('cn') ]
            else:
                data = users
            rv = 'ok'
        except Exception as e:
            data = e
            rv = 'err'
        finally:
            return {'msg':rv,'data':data}

def searchSinUser(self,name):
        try:
            ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                           search_filter='(uid=%s)' %name,
                                           search_scope= SUBTREE,
                                           attributes=ALL_ATTRIBUTES)
            user = list(ga)
            if user:
                data = user[0].get('attributes')

d = {
                    'user': str(data.get('cn')[0]),
                    'passwd': data.get('userPassword')[0].decode('utf-8'),
                    'gid': data.get('gidNumber'),
                    'mail': data.get('mail')[0],
                    'phonenumber': data.get('uidNumber')

}
                
            else:
                d = {}
            users = self.searchAll()
            usingle = [d for d in users if d.get('cn') == name ]
            u = {}
            if usingle:
                u = usingle[0]
        except Exception as e:
            print(e)
            u = {'msg': ErrMsg()}

finally:
            #print('user=',u)
            x = u.update(d)
            print('u=',u)
            return u

def editUser(self,args):
        try:
            cn = 'uid=%s,ou=People,%s' % (args.get('user'), self.dn)
            print(cn)
            passwd = args.get('passwd','')

user = self.searchSinUser(args.get('user'))
            data = ''
            if user.get('passwd') != passwd:
                if 'SSHA' not in passwd:
                    passwd = self.encodePasswd(passwd)
                   
                    data = self.conn.modify(cn, {'userPassword': (MODIFY_REPLACE,passwd)})
            if user.get('gid') != args.get('gid') and  args.get('gid') != 'default':
                data = self.conn.modify(cn,{'gidNumber': (MODIFY_REPLACE,args.get('gid')) })

except Exception as e:
            ErrMsg()
            data = e
        finally:
            print('data', data)
            return data

def setPasswd(self,**kwargs):
        try:
            cn = 'uid=%s,ou=People,%s' % (kwargs.get('user',''), self.dn)
            data = self.conn.modify(cn,{'userPassword':(MODIFY_REPLACE,kwargs.get('passwd', ''))})
        except Exception as e:
            data = ErrMsg()
        finally:
            return data

@staticmethod
    def GenPasswd():
        return ''.join(random.sample(string.ascii_letters + string.digits, 8))

def _searchUser_org(self,username):
        print('in-search-org')
        ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                       search_filter='(uid=%s)' %username,
                                       search_scope= SUBTREE,
                                       attributes=ALL_ATTRIBUTES)
        user = list(ga)
        print(user)
        entry = self.conn.response[0]
        dn = entry['dn']
        attr_dict = entry['attributes']

return (dn,attr_dict)

def authUser(self,username,password):
        
        try:
            cn = 'uid=%s,ou=People,%s' % (username,self.dn)
            conn2 = Connection(self._ip, user=cn, password=password,
                               check_names=True, lazy=False, raise_exceptions=False)
            conn2.bind()
            if conn2.result["description"] == "success":
                rv = 'ok'
                data = '认证成功'
            else:
                rv = 'err'
                data = '认证失败,用户名或密码错误!'
        except Exception as e:
            rv = 'err'
            data = ErrMsg()
        finally:
            return {'rv': rv,'data':data}

@staticmethod
    def encodePasswd(password=''):
        """
        :param password:
        :return:
        """
        if password:
            return ssha.encrypt(password,salt_size=12)
        return ''

def get_ouname(self,ouname):
        """
         ouname = '(|(ou=Groups)(ou=Group))'

:param ouname:
        :return:
        """
        print(self.dn,ouname)
        ou = self.conn.search('%s' % self.dn, '%s' % ouname )
        print(ou)
        retry = self.conn.entries
        num = len(retry)
        if num == 0:
            return []

else:
            return retry[0].entry_get_dn()

原文地址:http://blog.51cto.com/5766902/2131896

时间: 2024-08-30 17:18:14

python 操作 openldap 基本操作的相关文章

使用python操作InfluxDB

环境: CentOS6.5_x64InfluxDB版本:1.1.0Python版本 : 2.6 准备工作 启动服务器 执行如下命令: service influxdb start 示例如下: [[email protected] ~]# service influxdb start Starting influxdb... influxdb process was started [ OK ] [[email protected] ~]# 安装influxdb-python github地址: 

Python操作 Memcache、Redis、RabbitMQ

Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. memcached安装: memcached -d -m 10 -u root -l 192.168.132.130 -p

Python 操作 MySQL数据库

一.安装 MySQL 可以直接从MySQL官方网站下载最新版本.MySQL是跨平台的,选择对应的平台下载安装文件,安装即可. 如果是Windows用户,那么安装过程非常简单,直接根据向导一步一步操作即可. 如果是 Linux 用户,安装过程也是相当简单的. ## Ubuntu / Debian $ sudo apt-get install mysql-server $ sudo apt-get install mysql-client ## CentOS / RHEL # yum install

06 python操作MySQL和redis(进阶)

python操作mysql.redis 阶段一.mysql事务 主要用于处理操作量大,复杂度高的数据.比如说,在人员管理系统中,你删除一个人员,你即需要删除人员的基本资料,也要删除和该人员相关的信息,如信箱,文章等等,这样,这些数据库操作语句就构成一个事务! 事务处理可以用来维护数据库的完整性,保证成批的 SQL 语句要么全部执行,要么全部不执行. 事务用来管理 insert.update.delete 语句 事务必须满足4个条件(ACID):原子性(Atomicity,或称不可分割性).一致性

Python操作数据库(mysql redis)

一.python操作mysql数据库: 数据库信息:(例如211.149.218.16   szz  123456) 操作mysql用pymysql模块 #操作其他数据库,就安装相应的模块 import  pymysql ip='211.149.218.16' port=3306 passwd='123456' user='root' db='szz' conn=pymysql.connect(host=ip,user=user,port=port,passwd=passwd,db=db,cha

python操作mysql ------- SqlAchemy正传

本篇对于Python操作MySQL主要使用两种方式: 原生模块 pymsql ORM框架 SQLAchemy pymsql pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同. 下载安装 pip3 install pymysql 使用操作 1.执行SQL #!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql # 创建连接 conn = pymysql.connect(host='127.0.0.1

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

python操作mysql数据库

连接数据库 输入值 存入数据库 关闭 import string import mysql.connector conn=mysql.connector.connect(user='root',password='test',database='dalian',use_unicode=True) cursor=conn.cursor() a=raw_input('enter an id: ') b=raw_input('enter a name: ') while(a!='quit' or b!

python操作MySQL

本篇对于Python操作MySQL主要使用两种方式: 原生模块 pymsql ORM框架 SQLAchemy pymsql pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同. 下载安装 ? 1 pip3 install pymysql 使用操作 1.执行SQL + ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 #!/usr/bin/env python # -*-