from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE from samba.samdb import SamDB from samba.auth import system_session from samba.ndr import ndr_pack, ndr_unpack from samba.dcerpc import security import samba.param import logging from pprint import pprint import json import subprocess import csv class AD_provaider(): def __init__(self, url:str, serch_tree:str, user, password) -> None: logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") self.__server = Server(url) self.__connect = Connection(self.__server, user, password, authentication=NTLM) self.__ad_serch_tree = serch_tree if self.__connect.bind(): logging.info("status connect AD.........ok") else: logging.warning("status connect AD.........error") def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict: logging.info("search >>>>>>>>>>>>>> AD") if dn is not None: self.__connect.search(dn, search_filter, SUBTREE, attributes=filter) else: self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter) response = self.__connect.response_to_json() response = json.loads(response) response = json.dumps(response, ensure_ascii="utf-8") return json.loads(response) class Samba_provaider(): def __init__(self, path, password) -> None: self.__lp = samba.param.LoadParm() self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf") self.__sam = SamDB(lp=self.__lp, session_info=system_session()) self.__base = path self.__defsult_password = password def search_samba_ad(self, search_filter, attrs = ["*"], dn = None): if dn is not None: res = self.__sam.search(base=dn, expression=search_filter, attrs=attrs) return res return self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs) def add_ou(self, name): out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True) if out == 0: return True return False def add_group(self, name_group, ou): out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"], restore_signals=True) if out == 0: return True return False def create_user(self, data:dict, ou:str)->bool: cmd = ['samba-tool', "user", "add"] if data is not None: data = data.get("entries")[0].get("attributes") if data.get("sAMAccountName"): cmd.append(data.get("sAMAccountName")) cmd.append(self.__defsult_password) if data.get("givenName"): cmd.append("--given-name={}".format(data.get("givenName"))) if data.get("sn"): cmd.append("--surname={}".format(data.get("sn"))) if data.get("telephoneNumber"): cmd.append("--telephone-number={}".format(data.get("telephoneNumber"))) if data.get("mail"): cmd.append("--mail-address={}".format(data.get("mail"))) if data.get("company"): cmd.append("--company={}".format(data.get("company"))) if data.get("department"): cmd.append("--department={}".format(data.get("department"))) if data.get("title"): cmd.append("--job-title={}".format(data.get("title"))) if data.get("description"): cmd.append("--description={}".format(",".join(data.get("description")))) if data.get("physicalDeliveryOfficeName"): cmd.append("--physical-delivery-office={}".format(data.get("physicalDeliveryOfficeName"))) if data.get("profilePath"): cmd.append("--profile-path={}".format(data.get("profilePath"))) if data.get("scriptPath"): cmd.append("--script-path={}".format(data.get("scriptPath"))) if data.get("wWWHomePage"): cmd.append("--internet-address={}".format(data.get("wWWHomePage"))) if data.get("initials"): cmd.append("--initials={}".format(data.get("initials"))) # if data.get("homeDirectory"): # cmd.append("--home-directory={}".format(data.get("homeDirectory"))) cmd.append("--must-change-at-next-login") cmd.append("--use-username-as-cn") cmd.append( "--userou={}".format(ou)) out = subprocess.call(cmd,restore_signals=True, shell=False) if out == 0: logging.info( " -> Create user name: {} <-".format(data.get("sAMAccountName"))) self.__add_attr_user(data.get("sAMAccountName"), data) return True logging.warning(" -> this user already exists <-") return False else: return False def __add_attr_user(self, user:str, data:dict)->bool: logging.info("-> Run func add attr <-") dn = None d = self.search_samba_ad("sAMAccountName={}".format(user)) for i in d: if i.dn is not None: dn = i.dn out_str = [] out_str.append("dn: {}".format(str(dn))) out_str.append("changetype: modify") if data.get("manager"): manager_dn = [] for i in data.get("manager").split(","): if i.split("=")[0] != "DC": manager_dn.append(i) for item in str(dn).split(","): if str(item).split("=")[0] == "DC": manager_dn.append(item) out_str.append("add: manager") out_str.append("manager: {}".format(",".join(manager_dn))) if data.get("streetAddress"): out_str.append("add: streetAddress") out_str.append("streetAddress: {}".format(data.get("streetAddress"))) if data.get("l"): out_str.append("add: l") out_str.append("l: {}".format(data.get("l"))) if data.get("st"): out_str.append("add: st") out_str.append("st: {}".format(data.get("st"))) if data.get("postalCode"): out_str.append("add: postalCode") out_str.append("postalCode: {}".format(data.get("postalCode"))) if data.get("postOfficeBox"): out_str.append("add: postOfficeBox") d = ",".join(data.get("postOfficeBox")) out_str.append("postOfficeBox: {}".format(d)) if data.get("pager"): out_str.append("add: pager") out_str.append("pager: {}".format(data.get("pager"))) if data.get("c"): out_str.append("add: c") out_str.append("c: {}".format(data.get("c"))) if data.get("co"): out_str.append("add: co") out_str.append("co: {}".format(data.get("co"))) if data.get("mobile"): out_str.append("add: mobile") out_str.append("mobile: {}".format(data.get("mobile"))) if data.get("ipPhone"): out_str.append("add: ipPhone") out_str.append("ipPhone: {}".format(data.get("ipPhone"))) if data.get("info"): out_str.append("add: info") out_str.append("info: {}".format(data.get("info"))) if data.get("homePhone"): out_str.append("add: homePhone") out_str.append("homePhone: {}".format(data.get("homePhone"))) if data.get("facsimileTelephoneNumber"): out_str.append("add: facsimileTelephoneNumber") out_str.append("facsimileTelephoneNumber: {}".format(data.get("facsimileTelephoneNumber"))) try: cmd = ["ldbmodify", "-H", "/var/lib/samba/private/sam.ldb", "/tmp/{}.ldif".format(user)] with open("/tmp/{}.ldif".format(user), "w") as file: file.write("\n".join(out_str)) out = subprocess.call(cmd,restore_signals=True, shell=False) print(out) if out == 0: return True return False except Exception as ex: logging.error(ex) return False class Manager(): def __init__(self, smb:Samba_provaider, ad:AD_provaider) -> None: self.__smb = smb self.__ad = ad def open_csv_file(self, path)->list: out_list = [] try: with open(path, encoding="utf-8") as f: reader = csv.reader(f, delimiter="\t" ) next(reader) next(reader) for row in reader: out_list.append(row[0].split(";")) return out_list except Exception as ex: logging.error(ex) print("при открытии файла произошла ошибка необходимо проверить кодировку файла, а также путь до файла") return None def run_add_account(self, users_data:dict): for elems in users_data: ou = [] ou_list = elems.get("samba_ad").split(",") for items in ou_list: if items.split("=")[0] == "OU": ou.append(items) self.__smb.create_user(ad.search_ms_ad("(sAMAccountName={})".format(elems["user_name"])), ",".join(ou)) if __name__ == "__main__": MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru' SEARCH_FREE_MS = "dc=energo,dc=ru" MS_USER = 'energo\\administrator' PASSWORD = "P@sww0rd" PATH_CSV = "MigrBatch1.csv" SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru" DEFAULT_PASSWORD_USER = "!passw0rd" logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") logging.info("> Run script <") ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD) logging.info("> init AD_provaider <") smb = Samba_provaider(SEARCH_FREE_SAMBA, DEFAULT_PASSWORD_USER) logging.info("> init Samba_provaider <") manager = Manager(smb, ad) data_csv = manager.open_csv_file(PATH_CSV) data_user = [] for users in data_csv: d = {} d["user_name"] = users[2] d["ms_ou"] = users[4] d["samba_ad"] = users[5] data_user.append(d) manager.run_add_account(data_user)