Загрузить файлы в «/»

main
dkovtun 2024-05-23 12:47:56 +03:00
commit e7dcdb7eb2
5 changed files with 676 additions and 0 deletions

4
List_groups.csv Normal file
View File

@ -0,0 +1,4 @@
SamAccountName,"OU"
Base_group,OU=test_unit_alt
123_test_group,"OU=3,OU=1,OU=test_unit_alt"
Test_12,"OU=New_comps,OU=!Energo"
1 SamAccountName OU
2 Base_group OU=test_unit_alt
3 123_test_group OU=3,OU=1,OU=test_unit_alt
4 Test_12 OU=New_comps,OU=!Energo

5
MigrBatch1.csv Normal file
View File

@ -0,0 +1,5 @@
MigrBatch1
BatchName;DateMigration;SamAccountName;EmailAddress;UserSourceOU;UserTargetOU;ComputerName;ComputerMAC;ComputerDomain;ComputerSourceOU;ComputerTargetOU;ISO;VIPCritical;Notes;Software
1;15.05.2024;bin;bin@lennergo.ru;OU=test_unit_alt,DC=energo,DC=ru;OU=test_unit_alt,DC=lennergo,DC=ru;winclient1;;;CN=Computers,DC=energo,DC=ru;CN=Computers,DC=lennergo,DC=ru;;;;
1;15.05.2024;test;test@lennergo.ru;OU=test_unit_alt,DC=energo,DC=ru;OU=test_unit_alt,DC=lennergo,DC=ru;winclient2;;;CN=Computers,DC=alt-test,DC=corp;CN=Computers,DC=samba,DC=alt;;;;
1;15.05.2024;test_01;test_01@lennergo.ru;OU=3,OU=1,OU=test_unit_alt,DC=energo,DC=ru;OU=3,OU=1,OU=test_unit_alt,DC=lennergo,DC=ru;winclient2;;;CN=Computers,DC=alt-test,DC=corp;CN=Computers,DC=samba,DC=alt;;;;
1 MigrBatch1
2 BatchName;DateMigration;SamAccountName;EmailAddress;UserSourceOU;UserTargetOU;ComputerName;ComputerMAC;ComputerDomain;ComputerSourceOU;ComputerTargetOU;ISO;VIPCritical;Notes;Software
3 1;15.05.2024;bin;bin@lennergo.ru;OU=test_unit_alt,DC=energo,DC=ru;OU=test_unit_alt,DC=lennergo,DC=ru;winclient1;;;CN=Computers,DC=energo,DC=ru;CN=Computers,DC=lennergo,DC=ru;;;;
4 1;15.05.2024;test;test@lennergo.ru;OU=test_unit_alt,DC=energo,DC=ru;OU=test_unit_alt,DC=lennergo,DC=ru;winclient2;;;CN=Computers,DC=alt-test,DC=corp;CN=Computers,DC=samba,DC=alt;;;;
5 1;15.05.2024;test_01;test_01@lennergo.ru;OU=3,OU=1,OU=test_unit_alt,DC=energo,DC=ru;OU=3,OU=1,OU=test_unit_alt,DC=lennergo,DC=ru;winclient2;;;CN=Computers,DC=alt-test,DC=corp;CN=Computers,DC=samba,DC=alt;;;;

270
create_groups.py Normal file
View File

@ -0,0 +1,270 @@
#!/usr/bin/python3
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import samba.param
import logging
from pprint import pprint
import json
import csv
import subprocess
import sched
import time
import datetime
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self, search_filter ,filter:list = ["*"])->dict:
logging.info("search >>>>>>>>>>>>>> AD")
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
class Samba_provaider():
def __init__(self, path) -> None:
self.__lp = samba.param.LoadParm()
self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf")
self.__sam = SamDB(lp=self.__lp, session_info=system_session())
self.__base = path
def search_samba_ad(self, search_filter, attrs = ["*"]):
res = self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs)
return res
def add_ou(self, name):
out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True)
if out == 0:
return True
return False
def add_group(self, name_group, ou):
out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"],restore_signals=True)
if out == 0:
return True
return False
class Manager():
def __init__(self, samba_prov:Samba_provaider, ad:AD_provaider) -> None:
self._smb = samba_prov
self.__ad = ad
def find_user_by_groups(self, name:str, ou:str, base_ou:str)->dict:
str_base_ou = self.__get_str_base_ou(base_ou)
return self.__ad.search_ms_ad(f"(memberOf=CN={name},{ou},{str_base_ou})")
def find_group(self, name, ou, base):
base_ou = self.__get_str_base_ou(base)
return self.__ad.search_ms_ad(f"(&(objectCategory=group)(name={name})(distinguishedName=CN={name},{ou},{base_ou}))")
def __get_str_base_ou(self, base:str)->str:
ou_list = []
for i in base.split(","):
dn = i.split("=")[0]
v = i.split("=")[1]
out = "{}={}".format( dn.upper(), v)
ou_list.append(out)
return ",".join(ou_list)
def open_file_list_groups(self, path:str)->dict:
list_grups = []
with open(path, encoding="utf-8") as sv:
render = csv.DictReader(sv)
for i in render:
list_grups.append(i)
return list_grups
def find_smb_groups(self, name:str, ou:str, base:str):
group = self._smb.search_samba_ad(f"(&(objectCategory=group)(name={name})(distinguishedName=CN={name},{ou},{self.__get_str_base_ou(base)}))")
for i in group:
return dict(i)
def create_group_for_samba(self, name:str, ou:str, data_group:dict, base:str )->bool:
cmd = self._smb.add_group(name, ou)
if cmd:
self.__create_attrs(data_group, ou, base)
return True
return False
def __create_attrs(self, data_group:dict, ou, base_ou:str)->int:
logging.info("-> add attr <-")
attr_list = []
base_ou = self.__get_str_base_ou(base_ou)
l = []
for i in data_group.get("entries")[0].get("dn").split(","):
if i.split("=")[0] != "DC":
l.append(i)
l.append(base_ou)
dn = ",".join(l)
attr_list.append("dn: {}".format(dn))
attr_list.append("changetype: modify")
data = data_group.get("entries")[0].get("attributes")
if data.get("description"):
attr_list.append("add:description")
attr_list.append("description: {}".format(",".join(data.get("description"))))
if data.get("mail"):
attr_list.append("add:mail")
attr_list.append("mail: {}".format(data.get("mail")))
if data.get("info"):
attr_list.append("add:info")
attr_list.append("info: {}".format(data.get("info")))
if data.get("groupType"):
attr_list.append("replace:groupType")
attr_list.append("groupType: {}".format(data.get("groupType")))
cmd = ["ldbmodify", "-H", "/var/lib/samba/private/sam.ldb", "/tmp/group.ldif"]
with open("/tmp/group.ldif", "w") as file:
file.write("\n".join(attr_list))
out = subprocess.call(cmd,restore_signals=True, shell=False)
return out
def find_user_by_groups_samba(self, name, ou, base_ou):
str_base_ou = self.__get_str_base_ou(base_ou)
return self._smb.search_samba_ad(f"(memberOf=CN={name},{ou},{str_base_ou})")
def compare_entry_by_group(self, ms:list, name_group:str, samba_data, base):
if len(samba_data) == 0:
for items in ms:
if "person" in items.get("attributes").get("objectClass"):
if self.__is_user_samba(items.get("attributes").get("sAMAccountName")):
self.__add_entry_for_group_samba(name_group, items.get("attributes").get("sAMAccountName"))
if "group" in items.get("attributes").get("objectClass"):
add_name = self.__is_group_samba(items.get("dn"), base)
if add_name is not None:
self.__add_entry_for_group_samba(name_group, add_name)
else:
smb_list_name = []
ms_list_name = []
for i in samba_data:
smb_list_name.append(str(dict(i).get("sAMAccountName")))
for ms_i in ms:
if "person" in ms_i.get("attributes").get("objectClass"):
if ms_i.get("attributes").get("sAMAccountName") not in smb_list_name and self.__is_user_samba(ms_i.get("attributes").get("sAMAccountName")):
self.__add_entry_for_group_samba(name_group, ms_i.get("attributes").get("sAMAccountName"))
logging.info("-> add in group <-")
if "group" in ms_i.get("attributes").get("objectClass"):
if ms_i.get("attributes").get("sAMAccountName") not in smb_list_name:
add_name = self.__is_group_samba(ms_i.get("dn"), base)
if add_name is not None:
self.__add_entry_for_group_samba(name_group, add_name)
logging.info("-> add in group <-")
ms_list_name.append(ms_i.get("attributes").get("sAMAccountName"))
self.__delet_entry(ms_list_name, smb_list_name, name_group)
def __delet_entry(self, buf_ms:list, samba_buf:list, name_group:str):
for i in samba_buf:
if i not in buf_ms:
cmd = ["samba-tool", "group", "removemembers", name_group, i]
out = subprocess.call(cmd, restore_signals=True)
def __is_user_samba(self, name)->bool:
user = self._smb.search_samba_ad(f"(&(objectCategory=person)(objectClass=user)(sAMAccountName={name}))")
if len(user) !=0:
return True
return False
def __is_group_samba(self, ou, base):
path = []
for i in ou.split(","):
if i.split("=")[0] != "DC":
path.append(i)
p = ",".join(path)
group = self._smb.search_samba_ad(f"(&(objectCategory=group)(distinguishedName={p},{self.__get_str_base_ou(base)}))")
for i in group:
if dict(i).get("sAMAccountName"): return str(dict(i).get("sAMAccountName"))
return None
def __add_entry_for_group_samba(self, name_group, name_user)->int:
cmd = ["samba-tool", "group", "addmembers", name_group, name_user]
out = subprocess.call(cmd,restore_signals=True)
if out == 0:
logging.info("Added members to group {} account {}".format(name_group, name_user))
return out
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
PATH_SCV = "List_groups.csv"
SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru"
def run_script():
logging.info("> Run script <")
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
smb = Samba_provaider(SEARCH_FREE_SAMBA)
manager = Manager(smb, ad)
try:
flag_out = True
count = 0
while flag_out:
data = manager.open_file_list_groups(PATH_SCV)
for items in data:
group_data_ms = manager.find_group(items.get("SamAccountName"), items.get("OU"), SEARCH_FREE_MS)
samba_groups = manager.find_smb_groups(items.get("SamAccountName"), items.get("OU"),SEARCH_FREE_SAMBA)
if samba_groups is None:
logging.info("-> Create group <-")
manager.create_group_for_samba(items.get("SamAccountName"), items.get("OU"), group_data_ms, SEARCH_FREE_SAMBA)
else:
ms_group = manager.find_user_by_groups(items.get("SamAccountName"), items.get("OU"), SEARCH_FREE_MS)
samba_group = manager.find_user_by_groups_samba(items.get("SamAccountName"), items.get("OU"), SEARCH_FREE_SAMBA)
manager.compare_entry_by_group(ms_group.get("entries"), items.get("SamAccountName"), samba_group, SEARCH_FREE_SAMBA)
logging.info("> End script <-")
time.sleep(2)
count += 1
if count > 1: break
except KeyboardInterrupt as kb:
print("ctr+c")
except Exception as ex:
logging.error(ex)
scheduler = sched.scheduler(time.time, time.sleep)
# задание времени выполнения функции
event_time = datetime.datetime.now().replace(hour=12, minute=36, second=0, microsecond=0)
# добавление задания в планировщик
scheduler.enterabs(event_time.timestamp(), 1, run_script, ())
# запуск планировщика
while True:
try:
scheduler.run()
time.sleep(0.5)
except KeyboardInterrupt:
print("exit")
break

146
create_ou_ branch.py Normal file
View File

@ -0,0 +1,146 @@
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import samba.param
import logging
from pprint import pprint
import json
import subprocess
import csv
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict:
logging.info("search >>>>>>>>>>>>>> AD")
if dn is not None:
self.__connect.search(dn, search_filter, SUBTREE, attributes=filter)
else:
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
class Samba_provaider():
def __init__(self, path) -> None:
self.__lp = samba.param.LoadParm()
self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf")
self.__sam = SamDB(lp=self.__lp, session_info=system_session())
self.__base = path
def search_samba_ad(self, search_filter, attrs = ["*"]):
res = self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs)
return res
def add_ou(self, name):
out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True)
if out == 0:
return True
return False
def add_group(self, name_group, ou):
out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"],restore_signals=True)
if out == 0:
return True
return False
class Manager:
def __init__(self, samba_ad:Samba_provaider, ad:AD_provaider) -> None:
self._smb = samba_ad
self.__ad = ad
def creat_branch_ou(self, data_path_list:list):
for item in data_path_list:
i = item.split(",")
date = [m.split("=") for m in i]
date_list = list(reversed(date))
self.__seach_path_ou_and_add(date_list)
def __seach_path_ou_and_add(self, ou_list:list):
path = ""
try:
for i in ou_list:
if str(i[0]).replace('"', '') == "OU":
if len(path) == 0:
path = path + i[0].replace('"', '') +"="+i[1]
str_add = "add ou -> {} <- status ....ok".format(path)
print("***************************************")
logging.info(str_add)
print("***************************************")
self._smb.add_ou(path)
else:
path = i[0].replace('"', '') +"="+i[1]+","+ path
str_add = "add ou -> {} <- status ....ok".format(path)
print("***************************************")
logging.info(str_add)
print("***************************************")
self._smb.add_ou(path)
except Exception as ex:
logging.warning(ex)
def open_csv_file(self, path)->list:
out_list = []
try:
with open(path, encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t" )
next(reader)
next(reader)
for row in reader:
out_list.append(row[0].split(";"))
return out_list
except Exception as ex:
logging.error(ex)
print("при открытии файла произошла ошибка необходимо проверить кодировку файла, а также путь до файла")
return None
if __name__ == "__main__":
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
PATH_CSV = "MigrBatch1.csv"
SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru"
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.info("> Run script <")
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
smb = Samba_provaider(SEARCH_FREE_SAMBA)
manager = Manager(smb, ad)
date_file = manager.open_csv_file(PATH_CSV)
if date_file is not None:
ou_add_list = [ i[5] for i in date_file]
logging.info("run create ou")
manager.creat_branch_ou(ou_add_list)# Создаст структуру OU
logging.info("> END <")

251
create_users.py Normal file
View File

@ -0,0 +1,251 @@
from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import samba.param
import logging
from pprint import pprint
import json
import subprocess
import csv
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict:
logging.info("search >>>>>>>>>>>>>> AD")
if dn is not None:
self.__connect.search(dn, search_filter, SUBTREE, attributes=filter)
else:
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
class Samba_provaider():
def __init__(self, path, password) -> None:
self.__lp = samba.param.LoadParm()
self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf")
self.__sam = SamDB(lp=self.__lp, session_info=system_session())
self.__base = path
self.__defsult_password = password
def search_samba_ad(self, search_filter, attrs = ["*"], dn = None):
if dn is not None:
res = self.__sam.search(base=dn, expression=search_filter, attrs=attrs)
return res
return self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs)
def add_ou(self, name):
out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True)
if out == 0:
return True
return False
def add_group(self, name_group, ou):
out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"], restore_signals=True)
if out == 0:
return True
return False
def create_user(self, data:dict, ou:str)->bool:
cmd = ['samba-tool', "user", "add"]
if data is not None:
data = data.get("entries")[0].get("attributes")
if data.get("sAMAccountName"):
cmd.append(data.get("sAMAccountName"))
cmd.append(self.__defsult_password)
if data.get("givenName"):
cmd.append("--given-name={}".format(data.get("givenName")))
if data.get("sn"):
cmd.append("--surname={}".format(data.get("sn")))
if data.get("telephoneNumber"):
cmd.append("--telephone-number={}".format(data.get("telephoneNumber")))
if data.get("mail"):
cmd.append("--mail-address={}".format(data.get("mail")))
if data.get("company"):
cmd.append("--company={}".format(data.get("company")))
if data.get("department"):
cmd.append("--department={}".format(data.get("department")))
if data.get("title"):
cmd.append("--job-title={}".format(data.get("title")))
if data.get("description"):
cmd.append("--description={}".format(",".join(data.get("description"))))
if data.get("physicalDeliveryOfficeName"):
cmd.append("--physical-delivery-office={}".format(data.get("physicalDeliveryOfficeName")))
if data.get("profilePath"):
cmd.append("--profile-path={}".format(data.get("profilePath")))
if data.get("scriptPath"):
cmd.append("--script-path={}".format(data.get("scriptPath")))
if data.get("wWWHomePage"):
cmd.append("--internet-address={}".format(data.get("wWWHomePage")))
if data.get("initials"):
cmd.append("--initials={}".format(data.get("initials")))
# if data.get("homeDirectory"):
# cmd.append("--home-directory={}".format(data.get("homeDirectory")))
cmd.append("--must-change-at-next-login")
cmd.append("--use-username-as-cn")
cmd.append( "--userou={}".format(ou))
out = subprocess.call(cmd,restore_signals=True, shell=False)
if out == 0:
logging.info( " -> Create user name: {} <-".format(data.get("sAMAccountName")))
self.__add_attr_user(data.get("sAMAccountName"), data)
return True
logging.warning(" -> this user already exists <-")
return False
else:
return False
def __add_attr_user(self, user:str, data:dict)->bool:
logging.info("-> Run func add attr <-")
dn = None
d = self.search_samba_ad("sAMAccountName={}".format(user))
for i in d:
if i.dn is not None:
dn = i.dn
out_str = []
out_str.append("dn: {}".format(str(dn)))
out_str.append("changetype: modify")
if data.get("manager"):
manager_dn = []
for i in data.get("manager").split(","):
if i.split("=")[0] != "DC":
manager_dn.append(i)
for item in str(dn).split(","):
if str(item).split("=")[0] == "DC":
manager_dn.append(item)
out_str.append("add: manager")
out_str.append("manager: {}".format(",".join(manager_dn)))
if data.get("streetAddress"):
out_str.append("add: streetAddress")
out_str.append("streetAddress: {}".format(data.get("streetAddress")))
if data.get("l"):
out_str.append("add: l")
out_str.append("l: {}".format(data.get("l")))
if data.get("st"):
out_str.append("add: st")
out_str.append("st: {}".format(data.get("st")))
if data.get("postalCode"):
out_str.append("add: postalCode")
out_str.append("postalCode: {}".format(data.get("postalCode")))
if data.get("postOfficeBox"):
out_str.append("add: postOfficeBox")
d = ",".join(data.get("postOfficeBox"))
out_str.append("postOfficeBox: {}".format(d))
if data.get("pager"):
out_str.append("add: pager")
out_str.append("pager: {}".format(data.get("pager")))
if data.get("c"):
out_str.append("add: c")
out_str.append("c: {}".format(data.get("c")))
if data.get("co"):
out_str.append("add: co")
out_str.append("co: {}".format(data.get("co")))
if data.get("mobile"):
out_str.append("add: mobile")
out_str.append("mobile: {}".format(data.get("mobile")))
if data.get("ipPhone"):
out_str.append("add: ipPhone")
out_str.append("ipPhone: {}".format(data.get("ipPhone")))
if data.get("info"):
out_str.append("add: info")
out_str.append("info: {}".format(data.get("info")))
if data.get("homePhone"):
out_str.append("add: homePhone")
out_str.append("homePhone: {}".format(data.get("homePhone")))
if data.get("facsimileTelephoneNumber"):
out_str.append("add: facsimileTelephoneNumber")
out_str.append("facsimileTelephoneNumber: {}".format(data.get("facsimileTelephoneNumber")))
try:
cmd = ["ldbmodify", "-H", "/var/lib/samba/private/sam.ldb", "/tmp/{}.ldif".format(user)]
with open("/tmp/{}.ldif".format(user), "w") as file:
file.write("\n".join(out_str))
out = subprocess.call(cmd,restore_signals=True, shell=False)
print(out)
if out == 0:
return True
return False
except Exception as ex:
logging.error(ex)
return False
class Manager():
def __init__(self, smb:Samba_provaider, ad:AD_provaider) -> None:
self.__smb = smb
self.__ad = ad
def open_csv_file(self, path)->list:
out_list = []
try:
with open(path, encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t" )
next(reader)
next(reader)
for row in reader:
out_list.append(row[0].split(";"))
return out_list
except Exception as ex:
logging.error(ex)
print("при открытии файла произошла ошибка необходимо проверить кодировку файла, а также путь до файла")
return None
def run_add_account(self, users_data:dict):
for elems in users_data:
ou = []
ou_list = elems.get("samba_ad").split(",")
for items in ou_list:
if items.split("=")[0] == "OU":
ou.append(items)
self.__smb.create_user(ad.search_ms_ad("(sAMAccountName={})".format(elems["user_name"])), ",".join(ou))
if __name__ == "__main__":
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
PATH_CSV = "MigrBatch1.csv"
SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru"
DEFAULT_PASSWORD_USER = "!passw0rd"
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.info("> Run script <")
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
logging.info("> init AD_provaider <")
smb = Samba_provaider(SEARCH_FREE_SAMBA, DEFAULT_PASSWORD_USER)
logging.info("> init Samba_provaider <")
manager = Manager(smb, ad)
data_csv = manager.open_csv_file(PATH_CSV)
data_user = []
for users in data_csv:
d = {}
d["user_name"] = users[2]
d["ms_ou"] = users[4]
d["samba_ad"] = users[5]
data_user.append(d)
manager.run_add_account(data_user)