Lennergo_migrate_script/create_ou_branch.py

149 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import samba.param
import logging
from pprint import pprint
import json
import subprocess
import csv
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict:
logging.info("search >>>>>>>>>>>>>> AD")
if dn is not None:
self.__connect.search(dn, search_filter, SUBTREE, attributes=filter)
else:
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
class Samba_provaider():
def __init__(self, path) -> None:
self.__lp = samba.param.LoadParm()
self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf")
self.__sam = SamDB(lp=self.__lp, session_info=system_session())
self.__base = path
def search_samba_ad(self, search_filter, attrs = ["*"]):
res = self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs)
return res
def add_ou(self, name):
out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True)
if out == 0:
return True
return False
def add_group(self, name_group, ou):
out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"],restore_signals=True)
if out == 0:
return True
return False
class Manager:
def __init__(self, samba_ad:Samba_provaider, ad:AD_provaider) -> None:
self._smb = samba_ad
self.__ad = ad
def creat_branch_ou(self, data_path_list:list):
for item in data_path_list:
i = item.split(",")
date = [m.split("=") for m in i]
date_list = list(reversed(date))
self.__seach_path_ou_and_add(date_list)
def __seach_path_ou_and_add(self, ou_list:list):
path = ""
try:
for i in ou_list:
if str(i[0]).replace('"', '') == "OU":
if len(path) == 0:
path = path + i[0].replace('"', '') +"="+i[1]
str_add = "add ou -> {} <- status ....ok".format(path)
print("***************************************")
logging.info(str_add)
print("***************************************")
self._smb.add_ou(path)
else:
path = i[0].replace('"', '') +"="+i[1]+","+ path
str_add = "add ou -> {} <- status ....ok".format(path)
print("***************************************")
logging.info(str_add)
print("***************************************")
self._smb.add_ou(path)
except Exception as ex:
logging.warning(ex)
def open_csv_file(self, path)->list:
out_list = []
try:
with open(path, encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t" )
next(reader)
next(reader)
for row in reader:
out_list.append(row[0].split(";"))
return out_list
except Exception as ex:
logging.error(ex)
print("при открытии файла произошла ошибка необходимо проверить кодировку файла, а также путь до файла")
return None
if __name__ == "__main__":
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
PATH_CSV = "MigrBatch1.csv"
SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru"
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.info("> Run script <")
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
smb = Samba_provaider(SEARCH_FREE_SAMBA)
manager = Manager(smb, ad)
date_file = manager.open_csv_file(PATH_CSV)
if date_file is not None:
try:
ou_add_list = [ i[5] for i in date_file]
ou_comp = [i[10] for i in date_file]
except Exception as ex:
logging.error(ex)
logging.info("run create ou")
# manager.creat_branch_ou(ou_add_list)# Создаст структуру OU
manager.creat_branch_ou(ou_add_list)
manager.creat_branch_ou(ou_comp)
logging.info("> END <")