from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE import logging import json from pprint import pprint class AD_provaider(): def __init__(self, url:str, serch_tree:str, user, password) -> None: logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") self.__server = Server(url) self.__connect = Connection(self.__server, user, password, authentication=NTLM) self.__ad_serch_tree = serch_tree if self.__connect.bind(): logging.info("status connect AD.........ok") else: logging.warning("status connect AD.........error") def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict: logging.info("search >>>>>>>>>>>>>> AD") if dn is not None: self.__connect.search(dn, search_filter, SUBTREE, attributes=filter) else: self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter) response = self.__connect.response_to_json() response = json.loads(response) response = json.dumps(response, ensure_ascii="utf-8") return json.loads(response) if __name__ == "__main__": MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru' SEARCH_FREE_MS = "dc=energo,dc=ru" MS_USER = 'energo\\administrator' PASSWORD = "P@sww0rd" ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD) ################################################################# users_list = [ "bin", "test", "test_01" ] # Здесь находятся список пользователей по которому мы ищем ################################################################## users = {} users["Users"] = [] for user in users_list: data = ad.search_ms_ad(f"(sAMAccountName={user})") user_js = data.get("entries")[0].get("attributes") out = {} out["sAMAccountName"] = user_js.get("sAMAccountName") out["email"] = user_js.get("mail") # out["groups"] = user_js.get("memberOf") ou = [i for i in user_js.get("distinguishedName").split(",") if i.split("=")[0] != "CN" ] out["ou"] = ",".join(str(ou).encode("utf-8")) p = json.dumps(out, ensure_ascii=False) dumps = json.loads(p) users["Users"].append(out) with open("data_users.json", "w", encoding="utf-8") as f: json.dump(users, f, indent=4, ensure_ascii=False)