Files
NetWork/Torrent And Sync.py
2021-10-08 13:52:39 +03:00

212 lines
9.3 KiB
Python

from typing import Sequence, Union
import wmi
import subprocess
import glob
import re
import pywinauto
import pywin
import psutil
import keyboard
processes = [p for p in wmi.WMI().Win32_Process() if p.Name in ["GoogleDriveFS.exe",
"googledrivesync.exe", "qbittorrent.exe", "Surfshark.exe", "mstsc.exe"]]
def rdp(start_rdp_software: bool = True):
'''rdp detects if the rdp software is running on the computer. if the start_rdp_software is true at calling, the function will start the rdp if there is no program that runs.
elsewise, it will close it
:param start_rdp_software: switch. it can be called to run(true) or stop(false) the rdp software, defaults to True
:type start_rdp_software: bool, optional
'''
if start_rdp_software:
running = False
for process in processes:
if process.CommandLine == '"mstsc.exe" "D:\\Drive\\מסמכים\\vms\\משרדוש.rdp"' or process.CommandLine == '"C:\\Windows\\System32\\mstsc.exe" /v:"132.64.161.2"':
running = True
if not running:
subprocess.Popen('"mstsc.exe" "D:\\Drive\\מסמכים\\vms\\משרדוש.rdp"',
creationflags=subprocess.CREATE_NO_WINDOW, stdout=subprocess.PIPE, shell=True)
else:
for process in processes:
if process.CommandLine == '"mstsc.exe" "D:\\Drive\\מסמכים\\vms\\משרדוש.rdp"' or process.CommandLine == '"C:\\Windows\\System32\\mstsc.exe" /v:"132.64.161.2"':
try:
process.Terminate()
except:
continue
def allowd_macs(mac_address: Union[dict, tuple, str]):
allowed = ['00-E0-4C-68-13-E4']
if isinstance(mac_address, str):
if mac_address in allowed:
return True
for allowed_elements in allowed:
if allowed_elements in mac_address:
return True
return False
def start_process(process_list: list):
start = {
"Surfshark.exe": 'C:/Program Files (x86)/Surfshark/Surfshark.exe',
'qbittorrent.exe': "C:/Program Files/qBittorrent/qbittorrent.exe", "GoogleDriveFS.exe": glob.glob(
'C:/Program Files/Google/Drive File Stream/*/GoogleDriveFS.exe', recursive=True)[0],
"googledrivesync.exe": 'C:/Program Files/Google/Drive/googledrivesync.exe'
}
for p in process_list:
start_this_process = False
for k in processes:
if p == k.Name:
start_this_process = True
if not start_this_process:
subprocess.Popen(start[p], shell=True, creationflags=subprocess.CREATE_NO_WINDOW, stdout=subprocess.PIPE)
def stop_process(process_list: list):
for proc in process_list:
for k in processes:
if proc == k.Name:
try:
k.Terminate()
except:
continue
def disallow(process_list: list = ["GoogleDriveFS.exe", "googledrivesync.exe"],
mute: bool = True, run_process: list = None):
stop_process(process_list)
if run_process:
start_process(run_process)
if mute:
subprocess.run(["powershell", "-command", "Set-AudioDevice -PlaybackMute $true"],
shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
subprocess.run(["powershell", "-command", "Set-AudioDevice -PlaybackMute $false"],
shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
def allow(process_list: list = ["Surfshark.exe", 'qbittorrent.exe'], unmute: bool = True):
start_process(process_list)
if unmute:
subprocess.run(["powershell", "-command", "Set-AudioDevice -PlaybackMute $false"],
shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
else:
subprocess.run(["powershell", "-command", "Set-AudioDevice -PlaybackMute $true"],
shell=True, creationflags=subprocess.CREATE_NO_WINDOW)
def _actiavte_rasdial():
pywinauto.findwindows.find_window()
def dial_connect(connection: str = 'Connected to\nek\nCommand completed successfully.\n'):
connected = False if connection == 'Connected to\nek\nCommand completed successfully.\n' else True
if connected:
subprocess.Popen(
'rasphone ek',
shell=True, creationflags=subprocess.CREATE_NO_WINDOW, stdout=subprocess.PIPE)
# _actiavte_rasdial()
# keyboard.press_and_release('alt')
keyboard.press_and_release('enter')
# _actiavte_rasdial()
keyboard.press_and_release('enter')
subprocess.Popen(["powershell", "-command", "Set-AudioDevice -PlaybackMute $false"],
shell=True, creationflags=subprocess.CREATE_NO_WINDOW, stdout=subprocess.PIPE)
def _hiercy(network_status: dict):
if network_status["ek"]:
return "ek"
elif network_status["good_lan"]:
return 'good_lan'
elif network_status["lan"]:
return "lan"
return "Wi-Fi"
def get_net_status(supreme_mac: Union[str, list, tuple, set] = '00-E0-4C-68-13-F8'):
nets, stats = psutil.net_if_addrs(), psutil.net_if_stats()
networks = subprocess.Popen('pwsh -executionPolicy bypass -command "Get-NetConnectionProfile"',
shell=True, stdout=subprocess.PIPE).communicate()[0].decode()
macs = {nets[key][0][1]: key.strip('\u200f') for key in nets if stats[key][0]}
lan_mac = get_lan(supreme_mac, macs)
profiles = [Net[19:].strip('\u200f') for net in networks.split('\r\n\r\n')
for Net in net.split('\r\n') if 'InterfaceAlias :' in Net]
wifi = [net[15:].split('\r\n')[0] for net in networks.split('Name') if 'Wi-Fi' in net]
wifi = wifi[0] if wifi else None
vpn = [key.strip('\u200f') for key in nets if stats[key][0] and re.search(r'(surf|shark)', key)]
return {"Wi-Fi": wifi, "lan": (lan_mac), "ek": ("ek" in profiles), "vpn": vpn, "good_lan": allowd_macs(macs)} if vpn else {"Wi-Fi": wifi, "lan": (lan_mac), "ek": ("ek" in profiles), "good_lan": allowd_macs(macs)}
def get_lan(macs: Union[str, list, tuple, set] = '00-E0-4C-68-13-F8', dict_of_macs: dict = None):
SMAC = ['00-E0-4C-68-13-F8', "00-FF-33-C8-0F-88"]
if isinstance(macs, str) and macs in dict_of_macs:
return True
elif not isinstance(macs, str):
for m in macs:
if m in SMAC:
return True
return False
def get_status(supreme_mac: Union[str, list, tuple, set] = '00-E0-4C-68-13-F8'):
wifi = {
"allowed":
["Oliver", "Oliver5", "Oliver", "x018_497622", "TNCAPE5A34D", "MSBR", "Azrieli_Modiin_WIFI", "lu shalmata",
"mickey", "Mickey", "Network", "192.168.1.", "Silmarill", "wintunshark0", "saret", "Saret", "huji-meonot"],
"disallowed": ["HUJI-netX", "eduroam", "HUJI-guest", "132.64"]}
network_status = get_net_status(supreme_mac)
hierarchy = _hiercy(network_status)
if hierarchy == "Wi-Fi":
current_network = network_status["Wi-Fi"].split(' ')[0]
w = 0 if re.search(current_network, " ".join(wifi["disallowed"])) else 1
return {"Wi-Fi": w, "vpn": network_status["vpn"]} if network_status.get("vpn") else {"Wi-Fi": w}
elif hierarchy == 'good_lan':
return {'good_lan': True}
elif hierarchy == "lan":
return {"lan": network_status["lan"], "vpn": network_status["vpn"]} if network_status.get("vpn") else {"lan": network_status["lan"]}
return {"ek": network_status["ek"], "vpn": network_status["vpn"]} if network_status.get("vpn") else {"ek": network_status["ek"]}
def do_the_schtik(supreme_mac: str = '00-E0-4C-68-13-F8', discontinue: str = None):
network_status = get_status(supreme_mac)
if network_status.get("vpn"):
allow(["qbittorrent.exe", "GoogleDriveFS.exe", "googledrivesync.exe", "Surfshark.exe"])
rdp(False)
if network_status.get("lan"):
dial_connect(discontinue)
elif network_status.get('Wi-Fi') and network_status['Wi-Fi'] == 0:
allow(["qbittorrent.exe", "GoogleDriveFS.exe", "googledrivesync.exe", "Surfshark.exe"], False)
elif network_status.get('Wi-Fi') and network_status['Wi-Fi'] == 1:
allow(["qbittorrent.exe", "GoogleDriveFS.exe", "googledrivesync.exe", "Surfshark.exe"])
rdp(False)
elif network_status.get("good_lan") and network_status["good_lan"]:
allow(["qbittorrent.exe", "GoogleDriveFS.exe", "googledrivesync.exe", "Surfshark.exe"])
rdp(False)
else:
if network_status.get("lan"):
dial_connect(discontinue)
disallow(["Surfshark.exe", 'qbittorrent.exe'], False,
run_process=["GoogleDriveFS.exe", "googledrivesync.exe"])
rdp()
elif network_status.get("ek"):
disallow(["Surfshark.exe", 'qbittorrent.exe'], False,
run_process=["GoogleDriveFS.exe", "googledrivesync.exe"])
rdp()
else:
disallow(["Surfshark.exe", 'qbittorrent.exe'], run_process=["GoogleDriveFS.exe", "googledrivesync.exe"])
def main():
discontinue = subprocess.Popen(
'rasdial',
shell=True, creationflags=subprocess.CREATE_NO_WINDOW, stdout=subprocess.PIPE).communicate()[0].decode().split(
"\r\n")[0]
do_the_schtik(discontinue=discontinue)
exit()
if __name__ == '__main__':
main()
# get_net_status()