基于 python-ldap 的节点树的 LDAP/AD 便利性
项目描述
node.ext.ldap
概述
node.ext.ldap是一个 LDAP 便利库,用于基于 python-ldap(2.4 或更高版本)和node的 LDAP 通信。
该包包含基本配置和通信对象、一个 LDAP 节点对象和一个使用 node.ext.ugm的基于 LDAP 节点的用户和组管理实现。
这个包是 bda.ldap的继承者。
<nav class="contents" id="contents">内容
与 0.9.x 相比的 API 变化
LDAPNode实例不能再有子树的直接子级。这是一个设计缺陷,因为可能存在重复的 RDN。
LDAPNode.search 默认返回 DN 而不是 RDN。
辅助键和备用键属性功能已完全从LDAPNode中删除。
LDAPProps.check_duplicates设置已被删除。
用法
LDAP 属性
要为 LDAP 定义连接属性,请使用node.ext.ldap.LDAPProps 对象:
>>> from node.ext.ldap import LDAPProps
>>> props = LDAPProps(
... uri='ldap://localhost:12345/',
... user='cn=Manager,dc=my-domain,dc=com',
... password='secret',
... cache=False
... )
使用node.ext.ldap.testLDAPConnectivity测试服务器连接:
>>> from node.ext.ldap import testLDAPConnectivity
>>> assert testLDAPConnectivity(props=props) == 'success'
LDAP 连接
为了处理 LDAP 连接,使用node.ext.ldap.LDAPConnector。它期望构造函数中有一个LDAPProps实例。通常不需要直接实例化这个对象,这发生在创建更高抽象的过程中,见下文:
>>> from node.ext.ldap import LDAPConnector
>>> import ldap
>>> connector = LDAPConnector(props=props)
调用bind创建并返回 LDAP 连接:
>>> conn = connector.bind()
>>> assert isinstance(conn, ldap.ldapobject.ReconnectLDAPObject)
调用unbind会破坏连接:
>>> connector.unbind()
LDAP 通信
为了与 LDAP 服务器通信,使用node.ext.ldap.LDAPCommunicator。它提供了搜索和修改目录所需的所有基本功能。
LDAPCommunicator在创建时需要一个LDAPConnector实例:
>>> from node.ext.ldap import LDAPCommunicator
>>> communicator = LDAPCommunicator(connector)
绑定到服务器:
>>> communicator.bind()
添加目录条目:
>>> communicator.add(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... {
... 'cn': 'foo',
... 'sn': 'Mustermann',
... 'userPassword': 'secret',
... 'objectClass': ['person'],
... }
... )
设置默认搜索 DN:
>>> communicator.baseDN = 'ou=demo,dc=my-domain,dc=com'
在目录中搜索:
>>> import node.ext.ldap
>>> res = communicator.search(
... '(objectClass=person)',
... node.ext.ldap.SUBTREE
... )
>>> assert res == [(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... {
... 'objectClass': ['person'],
... 'userPassword': ['secret'],
... 'cn': ['foo'],
... 'sn': ['Mustermann']
... }
... )]
修改目录项:
>>> from ldap import MOD_REPLACE
>>> communicator.modify(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... [(MOD_REPLACE, 'sn', 'Musterfrau')]
... )
>>> res = communicator.search(
... '(objectClass=person)',
... node.ext.ldap.SUBTREE,
... attrlist=['cn']
... )
>>> assert res == [('cn=foo,ou=demo,dc=my-domain,dc=com', {'cn': ['foo']})]
更改代表用户的目录条目的密码:
>>> communicator.passwd(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... 'secret',
... '12345'
... )
>>> res = communicator.search(
... '(objectClass=person)',
... node.ext.ldap.SUBTREE,
... attrlist=['userPassword']
... )
>>> assert res == [(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... {'userPassword': ['{SSHA}...']}
... )]
删除目录条目:
>>> communicator.delete('cn=foo,ou=demo,dc=my-domain,dc=com')
>>> res = communicator.search(
... '(objectClass=person)',
... node.ext.ldap.SUBTREE
... )
>>> assert res == []
关闭连接:
>>> communicator.unbind()
LDAP 会话
node.ext.ldap.LDAPSession提供了一种更方便的处理 LDAP 的 方法。它基本上提供与LDAPCommunicator相同的功能,但会在执行操作之前自动创建连接对象并检查连接状态。
实例化LDAPSession对象。期望 LDAPProps实例:
>>> from node.ext.ldap import LDAPSession
>>> session = LDAPSession(props)
LDAP 会话可以方便地检查给定的属性:
>>> res = session.checkServerProperties()
>>> assert res == (True, 'OK')
为会话设置默认搜索 DN:
>>> session.baseDN = 'ou=demo,dc=my-domain,dc=com'
在目录中搜索:
>>> res = session.search()
>>> assert res == [
... ('ou=demo,dc=my-domain,dc=com',
... {
... 'objectClass': ['top', 'organizationalUnit'],
... 'ou': ['demo'],
... 'description': ['Demo organizational unit']
... }
... )]
添加目录条目:
>>> session.add(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... {
... 'cn': 'foo',
... 'sn': 'Mustermann',
... 'userPassword': 'secret',
... 'objectClass': ['person'],
... }
... )
更改代表用户的目录条目的密码:
>>> session.passwd('cn=foo,ou=demo,dc=my-domain,dc=com', 'secret', '12345')
验证特定用户:
>>> res = session.authenticate('cn=foo,ou=demo,dc=my-domain,dc=com', '12345')
>>> assert res is True
修改目录项:
>>> session.modify(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... [(MOD_REPLACE, 'sn', 'Musterfrau')]
... )
>>> res = session.search(
... '(objectClass=person)',
... node.ext.ldap.SUBTREE,
... attrlist=['cn']
... )
>>> assert res == [(
... 'cn=foo,ou=demo,dc=my-domain,dc=com',
... {'cn': ['foo']}
... )]
删除目录条目:
>>> session.delete('cn=foo,ou=demo,dc=my-domain,dc=com')
>>> res = session.search('(objectClass=person)', node.ext.ldap.SUBTREE)
>>> assert res == []
关闭会话:
>>> session.unbind()
LDAP 节点
可以将 LDAP 条目作为节点对象来处理。因此 使用node.ext.ldap.LDAPNode。要了解完整的节点 API,请参阅节点包。
创建一个 LDAP 节点。根节点需要基本 DN 和LDAPProps 实例:
>>> from node.ext.ldap import LDAPNode
>>> root = LDAPNode('ou=demo,dc=my-domain,dc=com', props=props)
每个 LDAP 节点都有一个 DN 和一个 RDN:
>>> root.DN
u'ou=demo,dc=my-domain,dc=com'
>>> root.rdn_attr
u'ou'
检查数据库中是否存在已创建的节点:
>>> root.exists
True
目录项还没有子项:
>>> root.keys()
[]
将子节点添加到根节点:
>>> person = LDAPNode()
>>> person.attrs['objectClass'] = ['person', 'inetOrgPerson']
>>> person.attrs['sn'] = 'Mustermann'
>>> person.attrs['userPassword'] = 'secret'
>>> root['cn=person1'] = person
>>> person = LDAPNode()
>>> person.attrs['objectClass'] = ['person', 'inetOrgPerson']
>>> person.attrs['sn'] = 'Musterfrau'
>>> person.attrs['userPassword'] = 'secret'
>>> root['cn=person2'] = person
如果在节点创建期间没有设置 RDN 属性,它会根据节点键计算并自动设置:
>>> person.attrs['cn']
u'person2'
从 LDAP 节点按键获取子 DN:
>>> root.child_dn('cn=person1')
u'cn=person1,ou=demo,dc=my-domain,dc=com'
看看树:
>>> root.printtree()
<ou=demo,dc=my-domain,dc=com - True>
<cn=person2,ou=demo,dc=my-domain,dc=com:cn=person2 - True>
<cn=person1,ou=demo,dc=my-domain,dc=com:cn=person1 - True>
条目尚未写入目录。修改 LDAP 节点树时,一切都在记忆中发生。持久化是通过调用树或它的一部分来完成的。您可以使用更改的 标志检查节点的同步状态。如果 changed 为True,则表示节点属性或节点子节点已更改:
>>> root.changed
True
>>> root()
>>> root.changed
False
修改 LDAP 节点:
>>> person = root['cn=person1']
修改现有属性:
>>> person.attrs['sn'] = 'Mustermensch'
添加新属性:
>>> person.attrs['description'] = 'Mustermensch description'
>>> person()
删除一个属性:
>>> del person.attrs['description']
>>> person()
删除 LDAP 节点:
>>> del root['cn=person2']
>>> root()
>>> root.printtree()
<ou=demo,dc=my-domain,dc=com - False>
<cn=person1,ou=demo,dc=my-domain,dc=com:cn=person1 - False>
搜索 LDAP
添加一些我们将搜索的用户和组:
>>> for i in range(2, 6):
... node = LDAPNode()
... node.attrs['objectClass'] = ['person', 'inetOrgPerson']
... node.attrs['sn'] = 'Surname %s' % i
... node.attrs['userPassword'] = 'secret%s' % i
... node.attrs['description'] = 'description%s' % i
... node.attrs['businessCategory'] = 'group1'
... root['cn=person%s' % i] = node
>>> node = LDAPNode()
>>> node.attrs['objectClass'] = ['groupOfNames']
>>> node.attrs['member'] = [
... root.child_dn('cn=person1'),
... root.child_dn('cn=person2'),
... ]
... node.attrs['description'] = 'IT'
>>> root['cn=group1'] = node
>>> node = LDAPNode()
>>> node.attrs['objectClass'] = ['groupOfNames']
>>> node.attrs['member'] = [
... root.child_dn('cn=person4'),
... root.child_dn('cn=person5'),
... ]
>>> root['cn=group2'] = node
>>> root()
>>> root.printtree()
<ou=demo,dc=my-domain,dc=com - False>
<cn=person1,ou=demo,dc=my-domain,dc=com:cn=person1 - False>
<cn=person2,ou=demo,dc=my-domain,dc=com:cn=person2 - False>
<cn=person3,ou=demo,dc=my-domain,dc=com:cn=person3 - False>
<cn=person4,ou=demo,dc=my-domain,dc=com:cn=person4 - False>
<cn=person5,ou=demo,dc=my-domain,dc=com:cn=person5 - False>
<cn=group1,ou=demo,dc=my-domain,dc=com:cn=group1 - False>
<cn=group2,ou=demo,dc=my-domain,dc=com:cn=group2 - False>
为了定义搜索条件,使用了 LDAP 过滤器,可以通过布尔运算符 '&' 和 '|' 组合:
>>> from node.ext.ldap import LDAPFilter
>>> filter = LDAPFilter('(objectClass=person)')
>>> filter |= LDAPFilter('(objectClass=groupOfNames)')
>>> res = sorted(root.search(queryFilter=filter))
>>> assert res == [
... u'cn=group1,ou=demo,dc=my-domain,dc=com',
... u'cn=group2,ou=demo,dc=my-domain,dc=com',
... u'cn=person1,ou=demo,dc=my-domain,dc=com',
... u'cn=person2,ou=demo,dc=my-domain,dc=com',
... u'cn=person3,ou=demo,dc=my-domain,dc=com',
... u'cn=person4,ou=demo,dc=my-domain,dc=com',
... u'cn=person5,ou=demo,dc=my-domain,dc=com'
... ]
定义多个条件 LDAP 过滤器:
>>> from node.ext.ldap import LDAPDictFilter
>>> filter = LDAPDictFilter({
... 'objectClass': ['person'],
... 'cn': 'person1'
... })
>>> res = root.search(queryFilter=filter)
>>> assert res == [u'cn=person1,ou=demo,dc=my-domain,dc=com']
定义关系 LDAP 过滤器。在这种情况下,我们在组 'cn' 和人员 'businessCategory' 之间建立关系:
>>> from node.ext.ldap import LDAPRelationFilter
>>> filter = LDAPRelationFilter(root['cn=group1'], 'cn:businessCategory')
>>> res = root.search(queryFilter=filter)
>>> assert res == [
... u'cn=person2,ou=demo,dc=my-domain,dc=com',
... u'cn=person3,ou=demo,dc=my-domain,dc=com',
... u'cn=person4,ou=demo,dc=my-domain,dc=com',
... u'cn=person5,ou=demo,dc=my-domain,dc=com'
... ]
可以组合不同的 LDAP 过滤器类型:
>>> filter &= LDAPFilter('(cn=person2)')
>>> str(filter)
'(&(businessCategory=group1)(cn=person2))'
LDAPNode.search接受以下关键字参数。如果使用多个关键字,请在适当的情况下将搜索条件与“&”结合起来。
如果给定attrlist,则结果项由 2 元组组成,其中 dict 包含位置 1 处的请求属性:
- 查询过滤器
LDAP 过滤器实例或字符串。如果给定的参数是字符串类型,则创建一个LDAPFilter实例。
- 标准
包含搜索条件的字典。创建了一个LDAPDictFilter实例。
- 属性列表
要返回的属性名称列表。 允许使用特殊属性rdn和dn 。
- 关系
LDAPRelationFilter实例或定义关系的字符串。如果给定的参数是字符串类型,则创建一个LDAPRelationFilter实例。
- 关系节点
结合关系参数,当作为字符串给出时,使用 relation_node而不是self 来创建过滤器。
- 完全符合
标记是否期望 1 长度的结果。如果结果为空或找到多个条目,则引发错误。
- or_search
结合条件,这个参数被传递给创建LDAPDictFilter。此标志控制是否将标准键 和值与“&”或“|”组合。
- or_keys
结合条件,这个参数被传递给创建LDAPDictFilter。此标志控制条件键是否与“|”组合 代替 '&'。
- or_values
结合条件,这个参数被传递给创建LDAPDictFilter。此标志控制标准值是否与“|”组合 代替 '&'。
- 页面大小
与cookie一起用于查询分页结果。
- 曲奇饼
与page_size一起用于查询分页结果。
- 获取节点
如果True结果包含LDAPNode实例而不是 DN
您可以在节点上定义搜索默认值,在此节点上调用搜索时始终考虑这些默认值。如果设置,它们总是与任何(可选)通过的过滤器结合使用'&'。
定义默认搜索范围:
>>> from node.ext.ldap import SUBTREE
>>> root.search_scope = SUBTREE
定义默认搜索过滤器,可以是 LDAPFilter、LDAPDictFilter、LDAPRelationFilter 或字符串类型:
>>> root.search_filter = LDAPFilter('objectClass=groupOfNames')
>>> res = root.search()
>>> assert res == [
... u'cn=group1,ou=demo,dc=my-domain,dc=com',
... u'cn=group2,ou=demo,dc=my-domain,dc=com'
... ]
>>> root.search_filter = None
将默认搜索条件定义为 dict:
>>> root.search_criteria = {'objectClass': 'person'}
>>> res = root.search()
>>> assert res == [
... u'cn=person1,ou=demo,dc=my-domain,dc=com',
... u'cn=person2,ou=demo,dc=my-domain,dc=com',
... u'cn=person3,ou=demo,dc=my-domain,dc=com',
... u'cn=person4,ou=demo,dc=my-domain,dc=com',
... u'cn=person5,ou=demo,dc=my-domain,dc=com'
... ]
定义默认搜索关系:
>>> root.search_relation = LDAPRelationFilter(
... root['cn=group1'],
... 'cn:businessCategory'
... )
>>> res = root.search()
>>> assert res == [
... u'cn=person2,ou=demo,dc=my-domain,dc=com',
... u'cn=person3,ou=demo,dc=my-domain,dc=com',
... u'cn=person4,ou=demo,dc=my-domain,dc=com',
... u'cn=person5,ou=demo,dc=my-domain,dc=com'
... ]
同样,与关键字参数一样,多个定义的默认值是 '&' 组合的:
# empty result, there are no groups with group 'cn' as 'description'
>>> root.search_criteria = {'objectClass': 'group'}
>>> res = root.search()
>>> assert res == []
JSON序列化
序列化和反序列化 LDAP 节点:
>>> root = LDAPNode('ou=demo,dc=my-domain,dc=com', props=props)
序列化孩子:
>>> from node.serializer import serialize
>>> json_dump = serialize(root.values())
清除并保留根:
>>> root.clear()
>>> root()
反序列化 JSON 转储:
>>> from node.serializer import deserialize
>>> deserialize(json_dump, root=root)
[<cn=person1,ou=demo,dc=my-domain,dc=com:cn=person1 - True>,
<cn=person2,ou=demo,dc=my-domain,dc=com:cn=person2 - True>,
<cn=person3,ou=demo,dc=my-domain,dc=com:cn=person3 - True>,
<cn=person4,ou=demo,dc=my-domain,dc=com:cn=person4 - True>,
<cn=person5,ou=demo,dc=my-domain,dc=com:cn=person5 - True>,
<cn=group1,ou=demo,dc=my-domain,dc=com:cn=group1 - True>,
<cn=group2,ou=demo,dc=my-domain,dc=com:cn=group2 - True>]
由于已给出 root,因此添加了已创建的节点:
>>> root()
>>> root.printtree()
<ou=demo,dc=my-domain,dc=com - False>
<cn=person1,ou=demo,dc=my-domain,dc=com:cn=person1 - False>
<cn=person2,ou=demo,dc=my-domain,dc=com:cn=person2 - False>
<cn=person3,ou=demo,dc=my-domain,dc=com:cn=person3 - False>
<cn=person4,ou=demo,dc=my-domain,dc=com:cn=person4 - False>
<cn=person5,ou=demo,dc=my-domain,dc=com:cn=person5 - False>
<cn=group1,ou=demo,dc=my-domain,dc=com:cn=group1 - False>
<cn=group2,ou=demo,dc=my-domain,dc=com:cn=group2 - False>
非简单与简单模式。创建带有孩子的容器:
>>> container = LDAPNode()
>>> container.attrs['objectClass'] = ['organizationalUnit']
>>> root['ou=container'] = container
>>> person = LDAPNode()
>>> person.attrs['objectClass'] = ['person', 'inetOrgPerson']
>>> person.attrs['sn'] = 'Mustermann'
>>> person.attrs['userPassword'] = 'secret'
>>> container['cn=person1'] = person
>>> root()
默认模式下的序列化包含特定于类型的信息。因此 JSON 转储可以在以后反序列化:
>>> serialized = s