Lennergo_migrate_script/create_users.py

251 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from ldap3 import Server, Connection, ALL, NTLM, SUBTREE, SAFE_SYNC, BASE
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import samba.param
import logging
from pprint import pprint
import json
import subprocess
import csv
class AD_provaider():
def __init__(self, url:str, serch_tree:str, user, password) -> None:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
self.__server = Server(url)
self.__connect = Connection(self.__server, user, password, authentication=NTLM)
self.__ad_serch_tree = serch_tree
if self.__connect.bind():
logging.info("status connect AD.........ok")
else:
logging.warning("status connect AD.........error")
def search_ms_ad(self,search_filter ,filter:list = ["*"], dn = None)->dict:
logging.info("search >>>>>>>>>>>>>> AD")
if dn is not None:
self.__connect.search(dn, search_filter, SUBTREE, attributes=filter)
else:
self.__connect.search(self.__ad_serch_tree, search_filter, SUBTREE, attributes=filter)
response = self.__connect.response_to_json()
response = json.loads(response)
response = json.dumps(response, ensure_ascii="utf-8")
return json.loads(response)
class Samba_provaider():
def __init__(self, path, password) -> None:
self.__lp = samba.param.LoadParm()
self.__lp.load(samba.param.default_path()) #или lp.load("/etc/samba/smb.conf")
self.__sam = SamDB(lp=self.__lp, session_info=system_session())
self.__base = path
self.__defsult_password = password
def search_samba_ad(self, search_filter, attrs = ["*"], dn = None):
if dn is not None:
res = self.__sam.search(base=dn, expression=search_filter, attrs=attrs)
return res
return self.__sam.search(base=self.__base, expression=search_filter, attrs=attrs)
def add_ou(self, name):
out = subprocess.call(['samba-tool', "ou", "create", f"{name}"],restore_signals=True)
if out == 0:
return True
return False
def add_group(self, name_group, ou):
out = subprocess.call(['samba-tool', "group", "add", f"{name_group}", "--groupou", f"{ou}"], restore_signals=True)
if out == 0:
return True
return False
def create_user(self, data:dict, ou:str)->bool:
cmd = ['samba-tool', "user", "add"]
if data is not None:
data = data.get("entries")[0].get("attributes")
if data.get("sAMAccountName"):
cmd.append(data.get("sAMAccountName"))
cmd.append(self.__defsult_password)
if data.get("givenName"):
cmd.append("--given-name={}".format(data.get("givenName")))
if data.get("sn"):
cmd.append("--surname={}".format(data.get("sn")))
if data.get("telephoneNumber"):
cmd.append("--telephone-number={}".format(data.get("telephoneNumber")))
if data.get("mail"):
cmd.append("--mail-address={}".format(data.get("mail")))
if data.get("company"):
cmd.append("--company={}".format(data.get("company")))
if data.get("department"):
cmd.append("--department={}".format(data.get("department")))
if data.get("title"):
cmd.append("--job-title={}".format(data.get("title")))
if data.get("description"):
cmd.append("--description={}".format(",".join(data.get("description"))))
if data.get("physicalDeliveryOfficeName"):
cmd.append("--physical-delivery-office={}".format(data.get("physicalDeliveryOfficeName")))
if data.get("profilePath"):
cmd.append("--profile-path={}".format(data.get("profilePath")))
if data.get("scriptPath"):
cmd.append("--script-path={}".format(data.get("scriptPath")))
if data.get("wWWHomePage"):
cmd.append("--internet-address={}".format(data.get("wWWHomePage")))
if data.get("initials"):
cmd.append("--initials={}".format(data.get("initials")))
# if data.get("homeDirectory"):
# cmd.append("--home-directory={}".format(data.get("homeDirectory")))
cmd.append("--must-change-at-next-login")
cmd.append("--use-username-as-cn")
cmd.append( "--userou={}".format(ou))
out = subprocess.call(cmd,restore_signals=True, shell=False)
if out == 0:
logging.info( " -> Create user name: {} <-".format(data.get("sAMAccountName")))
self.__add_attr_user(data.get("sAMAccountName"), data)
return True
logging.warning(" -> this user already exists <-")
return False
else:
return False
def __add_attr_user(self, user:str, data:dict)->bool:
logging.info("-> Run func add attr <-")
dn = None
d = self.search_samba_ad("sAMAccountName={}".format(user))
for i in d:
if i.dn is not None:
dn = i.dn
out_str = []
out_str.append("dn: {}".format(str(dn)))
out_str.append("changetype: modify")
if data.get("manager"):
manager_dn = []
for i in data.get("manager").split(","):
if i.split("=")[0] != "DC":
manager_dn.append(i)
for item in str(dn).split(","):
if str(item).split("=")[0] == "DC":
manager_dn.append(item)
out_str.append("add: manager")
out_str.append("manager: {}".format(",".join(manager_dn)))
if data.get("streetAddress"):
out_str.append("add: streetAddress")
out_str.append("streetAddress: {}".format(data.get("streetAddress")))
if data.get("l"):
out_str.append("add: l")
out_str.append("l: {}".format(data.get("l")))
if data.get("st"):
out_str.append("add: st")
out_str.append("st: {}".format(data.get("st")))
if data.get("postalCode"):
out_str.append("add: postalCode")
out_str.append("postalCode: {}".format(data.get("postalCode")))
if data.get("postOfficeBox"):
out_str.append("add: postOfficeBox")
d = ",".join(data.get("postOfficeBox"))
out_str.append("postOfficeBox: {}".format(d))
if data.get("pager"):
out_str.append("add: pager")
out_str.append("pager: {}".format(data.get("pager")))
if data.get("c"):
out_str.append("add: c")
out_str.append("c: {}".format(data.get("c")))
if data.get("co"):
out_str.append("add: co")
out_str.append("co: {}".format(data.get("co")))
if data.get("mobile"):
out_str.append("add: mobile")
out_str.append("mobile: {}".format(data.get("mobile")))
if data.get("ipPhone"):
out_str.append("add: ipPhone")
out_str.append("ipPhone: {}".format(data.get("ipPhone")))
if data.get("info"):
out_str.append("add: info")
out_str.append("info: {}".format(data.get("info")))
if data.get("homePhone"):
out_str.append("add: homePhone")
out_str.append("homePhone: {}".format(data.get("homePhone")))
if data.get("facsimileTelephoneNumber"):
out_str.append("add: facsimileTelephoneNumber")
out_str.append("facsimileTelephoneNumber: {}".format(data.get("facsimileTelephoneNumber")))
try:
cmd = ["ldbmodify", "-H", "/var/lib/samba/private/sam.ldb", "/tmp/{}.ldif".format(user)]
with open("/tmp/{}.ldif".format(user), "w") as file:
file.write("\n".join(out_str))
out = subprocess.call(cmd,restore_signals=True, shell=False)
print(out)
if out == 0:
return True
return False
except Exception as ex:
logging.error(ex)
return False
class Manager():
def __init__(self, smb:Samba_provaider, ad:AD_provaider) -> None:
self.__smb = smb
self.__ad = ad
def open_csv_file(self, path)->list:
out_list = []
try:
with open(path, encoding="utf-8") as f:
reader = csv.reader(f, delimiter="\t" )
next(reader)
next(reader)
for row in reader:
out_list.append(row[0].split(";"))
return out_list
except Exception as ex:
logging.error(ex)
print("при открытии файла произошла ошибка необходимо проверить кодировку файла, а также путь до файла")
return None
def run_add_account(self, users_data:dict):
for elems in users_data:
ou = []
ou_list = elems.get("samba_ad").split(",")
for items in ou_list:
if items.split("=")[0] == "OU":
ou.append(items)
self.__smb.create_user(ad.search_ms_ad("(sAMAccountName={})".format(elems["user_name"])), ",".join(ou))
if __name__ == "__main__":
MS_AD_ADRESS = 'ldap://cp-vm-dc01.energo.ru'
SEARCH_FREE_MS = "dc=energo,dc=ru"
MS_USER = 'energo\\administrator'
PASSWORD = "P@sww0rd"
PATH_CSV = "MigrBatch1.csv"
SEARCH_FREE_SAMBA = "dc=lenenergo,dc=ru"
DEFAULT_PASSWORD_USER = "!passw0rd"
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.info("> Run script <")
ad = AD_provaider(MS_AD_ADRESS, SEARCH_FREE_MS, MS_USER, PASSWORD)
logging.info("> init AD_provaider <")
smb = Samba_provaider(SEARCH_FREE_SAMBA, DEFAULT_PASSWORD_USER)
logging.info("> init Samba_provaider <")
manager = Manager(smb, ad)
data_csv = manager.open_csv_file(PATH_CSV)
data_user = []
for users in data_csv:
d = {}
d["user_name"] = users[2]
d["ms_ou"] = users[4]
d["samba_ad"] = users[5]
data_user.append(d)
manager.run_add_account(data_user)