Lennergo_migrate_script/get_users_data.py

67 lines
2.5 KiB
Python

from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
import logging
import json
from pprint import pprint
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict:
logging.info("search >>>>>>>>>>>>>> AD")
if dn is not None:
self.__connect.search(dn, search_filter, SUBTREE, attributes=filter)
else:
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
if __name__ == "__main__":
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
#################################################################
users_list = [
"bin",
"test",
"test_01"
] # Здесь находятся список пользователей по которому мы ищем
##################################################################
users = {}
users["Users"] = []
for user in users_list:
data = ad.search_ms_ad(f"(sAMAccountName={user})")
user_js = data.get("entries")[0].get("attributes")
out = {}
out["sAMAccountName"] = user_js.get("sAMAccountName")
out["email"] = user_js.get("mail")
# out["groups"] = user_js.get("memberOf")
ou = [i for i in user_js.get("distinguishedName").split(",") if i.split("=")[0] != "CN" ]
out["ou"] = ",".join(str(ou).encode("utf-8"))
p = json.dumps(out, ensure_ascii=False)
dumps = json.loads(p)
users["Users"].append(out)
with open("data_users.json", "w", encoding="utf-8") as f:
json.dump(users, f, indent=4, ensure_ascii=False)