File: //opt/microsoft/mdatp/tools/client_analyzer/python/mde_tools/machine.py
import urllib
from .utils import DEBUG_MODE, ONBOARDING_PACKAGE, ONBOARDING_SCRIPTS,COLLECTION_DIR, error
from .utils import run, run_with_output ,wait, OFFBOARDING_PACKAGE, DIY_URL_MACOS, DIY_URL_LINUX, DIY_PACKAGE_MACOS, DIY_PACKAGE_LINUX, LINUX_DIY_SCRIPT
from mde_tools import constants
import logging
import platform
import distro as distro_info
import os, time, re, tempfile
import json
import socket
import threading
import select
log = logging.getLogger(constants.LOGGER_NAME)
class os_details:
platform = None
package_manager = None
version = None
distro = None
def __init__(self):
self.platform, self.distro, self.package_manager, self.version, self.build = self._identify_platform()
log.debug(f"platform: [{self.platform}]")
log.debug(f"distribution: [{self.distro}] version: [{self.version}]")
log.debug(f"package manager: [{self.package_manager}]")
def _identify_platform(self):
if platform.system() == 'Darwin':
sw_vers = run_with_output("sw_vers")
version = self._find_regex(sw_vers, r'ProductVersion:\s+(\S+)')
build = self._find_regex(sw_vers, r'BuildVersion:\s+(\S+)')
return (constants.MACOS_PLATFORM, "Darwin", "brew", version, build)
if platform.system() == "Linux":
package_mapping = {
"debian": "apt", # ubuntu [deb]
"fedora": "yum", # rhel, centos, oracle [rpm]
"sles": "zypper", # sles [rpm]
"centos": "yum",
"azurelinux": "tdnf", # Azure Linux - uses tdnf (Tiny DNF)
"mariner": "dnf" # CBL-Mariner
}
version = distro_info.version(best=True)
for like, package_manager in package_mapping.items():
if like in [distro_info.like(), distro_info.id()]:
return (constants.LINUX_PLATFORM, distro_info.name(), package_manager, version, "")
return (constants.LINUX_PLATFORM, distro_info.name(), None, version, "")
raise NotImplementedError(f"unknown platform: {platform.system()}")
def _find_regex(self, text, expression):
matches = re.findall(expression, text)
if matches is None or len(matches) == 0:
return None
return ''.join(matches[0])
def is_big_sur_and_up(self) -> bool:
if self.platform is constants.LINUX_PLATFORM:
return False
if not self.version:
return False
version_parts = self.version.split(".")
major_ver = version_parts[0] if len(version_parts)>0 else 0
return int(major_ver) >= 11
class package:
name = None
version = None
installed = False
source = None
def __init__(self, name, version = None, is_installed = False, source = None):
self.name = name
self.version = version
self.installed = is_installed
self.source = source
def __repr__(self):
return f'Package name: {self.name}, version: {self.version}, installed? {self.installed}, source: {self.source}'
class machine:
download_folder = None
username = None
hostname = None
platform = None
distro = None
os_version = None
package_manager = None
start_time = time.time()
temp_folder = None
@staticmethod
def get_start_time():
return machine.start_time
@staticmethod
def get_platform():
if machine.platform is None:
details = os_details()
machine.platform = details.platform
machine.package_manager = details.package_manager
machine.os_version = details.version
machine.distro = details.distro
return machine.platform
@staticmethod
def is_installed(appname):
return run_with_output(f"which {appname}") is not None
def is_connected(self, url='http://google.com'):
try:
urllib.request.urlopen(url) # Python 3.x
return True
except:
return False
@staticmethod
def create_multiple_process_events(no_of_events):
for event_number in range(no_of_events):
run(f"ls ./test_{event_number+1} &> /dev/null", False)
time.sleep(0.01)
@staticmethod
def create_multiple_file_events(no_of_events):
possible_paths = ['/bin/ls', 'usr/bin/ls']
possible_paths = [path for path in possible_paths if os.path.exists(path)]
if len(possible_paths) == 0:
raise Exception("could not find executable binary")
exec_path = possible_paths[0]
temp_dir = machine.get_temp_folder()
for event_number in range(no_of_events):
dest_path = os.path.join(temp_dir, f"test_{event_number+1}.ls")
run(f"cp {exec_path} {dest_path}", False)
time.sleep(0.01)
run(f"rm {temp_dir}/test_*")
@staticmethod
def create_multiple_network_events(no_of_events, timeout = 60):
start_time = time.time()
# create multiple network event with IPV6 connections
def connect_event():
try:
ipv6_ip = '2a02:26f0:12f:2a3::356e 80'
machine.create_ipv6_conn(ipv6_ip)
return 1
except Exception as ex:
log.warning(f"failed to complete events: {ex}")
return 0
connection_events = 0
while( connection_events < no_of_events and (time.time() - start_time) < timeout ):
connection_events = connection_events + connect_event()
elapsed_time = time.time() - start_time
log.debug(f"{connection_events} network events created [{elapsed_time:.2f}s]")
return connection_events == no_of_events
@staticmethod
def create_multiple_network_events2(end):
# create multiple network event with IPV4 connections
def connect_non_blocking(ip, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
try:
s.connect((ip, port))
except socket.error as e:
if e.errno == socket.errno.EINPROGRESS:
pass
else:
return False
return s
def is_connection_complete(sock):
_readable, writable, _exceptional = select.select([], [sock], [], 0)
return sock in writable
def check_connections(ip_list, port):
results = {}
def check_single_connection(ip):
sock = connect_non_blocking(ip, port)
if sock and is_connection_complete(sock):
results[ip] = True
sock.close()
else:
results[ip] = False
threads = []
for ip in ip_list:
thread = threading.Thread(target=check_single_connection, args=(ip,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
return results
# Example usage
ip_list = [f"192.168.1.{i}" for i in range(1, end + 1)]
# ip_list = ["192.168.1.1", "192.168.1.2", "192.168.1.3"]
port_number = 8080
results = check_connections(ip_list, port_number)
for ip, success in results.items():
if success:
print(f"Connection to {ip} successful.")
else:
print(f"Connection to {ip} failed.")
@staticmethod
def trigger_mac_login_event():
return run('sudo scutil <<< "notify State:/Users/ConsoleUser"')
@staticmethod
def get_sense_guid(logs):
for logs_dir in [logs, os.path.join(logs, 'rotated')]:
sense_guid_line = machine._get_last_line_from_logs(logs_dir, "senseGuid:")
if sense_guid_line is not None:
return machine._parse_from_json(sense_guid_line, "senseGuid:")
return None
@staticmethod
def find_messages_in_log(logs_dir, key):
messages_array = []
lines = machine._get_last_line_from_logs(logs_dir, key, get_all_lines=True)
if lines is None:
return None
for line in lines:
time_since_epoch = machine._parse_time_from_log_message(line)
if time_since_epoch is None:
continue
if time_since_epoch > machine.start_time:
messages_array.append(line)
if len(messages_array) > 0:
return messages_array
return None
@staticmethod
def _parse_time_from_log_message(line):
time_string_list = re.findall(r'\[.+?\]', line)
if time_string_list is None or len(time_string_list) == 0:
log.info("failed to parse datetime from log mesaage")
return None
# Expected format: [2020-04-28 05:14:25.659656 UTC]
time_obj = time.strptime(time_string_list[2][1:-1].split('.')[0], "%Y-%m-%d %H:%M:%S")
# Return time since epoch in UTC
return time.mktime(time_obj) - time.altzone
@staticmethod
def last_log_message_time(logs_dir, key):
line = machine._get_last_line_from_logs(logs_dir, key)
if line is None:
return None
time_since_epoch = machine._parse_time_from_log_message(line)
return time_since_epoch
@staticmethod
def verify_msg_in_logs(logs_dir, message, ref_time=None, timeout_sec = 5):
if ref_time is None:
ref_time = machine.get_start_time()
log.info(f"Looking for [{message}] in logs")
start_time = time.time()
while time.time() - start_time < timeout_sec:
last_log_msg_time = machine.last_log_message_time(logs_dir, message)
if last_log_msg_time is None:
wait(5, "retry: waiting for msg to appear in log")
continue
delta = last_log_msg_time - ref_time
log.debug(f"last message time: {last_log_msg_time}. delta = {delta}")
if delta < 0:
wait(5, "retry: waiting for updated msg to appear in log")
continue
return True
return False
@staticmethod
def get_hostname():
if machine.hostname is None:
platform = machine.get_platform()
if platform == 'macOS':
machine.hostname = run_with_output("scutil --get LocalHostName")
if platform == 'Linux':
machine.hostname = run_with_output("/bin/hostname")
log.info(f"Hostname: [{machine.hostname}]")
return machine.hostname
@staticmethod
def get_username():
if machine.username is None:
machine.username = str(run_with_output("whoami"))
return machine.username
@staticmethod
def create_executable(output_filename):
input_file = run_with_output("which curl")
return run(f"cp {input_file} {output_filename}", validate_exit_code=True)
@staticmethod
def create_ipv6_conn(ipv6_ip):
return run_with_output("nc 2a00:a040:0:6::d437:ba4a 443", timeout_in_sec=120) == ''
@staticmethod
def create_ipv4_conn(ipv4_ip):
return run_with_output(f"curl {ipv4_ip}", timeout_in_sec=120)
@staticmethod
def download_diy_package(timeout_sec=30):
# TODO: move out of browser class
platform = machine.get_platform()
if platform == "Linux":
diy_url = DIY_URL_LINUX
diy_package = DIY_PACKAGE_LINUX
else:
diy_url = DIY_URL_MACOS
diy_package = DIY_PACKAGE_MACOS
log.debug(f"diy url: [{diy_url}]")
machine.download_file(diy_url, filename=diy_package)
log.debug("downloading...")
start_time = time.time()
while time.time() - start_time < timeout_sec:
package_downloaded = machine.package_downloaded(diy_package)
if package_downloaded:
break
wait(1, "retrying to download DIY package")
if not package_downloaded:
return False
log.debug("package downloaded")
unzipped = machine.unzip_package(diy_package)
log.debug(f"package unzipped: [{unzipped}]")
if platform == "macOS":
return unzipped is not None
elif platform != "Linux":
return False
# Linux:
log.info(f"executing script: {unzipped}")
success = run(f"bash {unzipped}")
log.debug(f"download_diy_package success: {success}")
run(f'rm "{unzipped}"')
return success
@staticmethod
def download_file(url, filename=None, timeout_sec=30):
download_folder = machine.get_download_folder()
if filename is None:
command = f'''cd {download_folder} && curl --connect-timeout {timeout_sec} -L -O "{url}"'''
else:
if os.path.isabs(filename):
download_folder = os.path.dirname(filename)
filename = os.path.basename(filename)
command = f'''cd {download_folder} && curl --connect-timeout {timeout_sec} -L -o "{filename}" "{url}"'''
if machine.get_platform() == 'macOS': ## MacOS curl is different as we used in linux so we installed similar version from brew to make parity on our events
try:
log.info("trying to use newest curl version from brew MacOS")
versions = os.listdir('/usr/local/Cellar/curl/')
if versions:
# Sort the versions
sorted_versions = sorted(versions, key=lambda s: list(map(int, s.split('.'))))
latest_version = sorted_versions[-1]
new_curl_path = f"/usr/local/Cellar/curl/{latest_version}/bin/curl"
if os.path.exists(new_curl_path):
print(f"new_curl_path: {new_curl_path}")
if filename is None:
command = f'''cd {download_folder} && {new_curl_path} --connect-timeout {timeout_sec} -L -O "{url}"'''
else:
command = f'''cd {download_folder} && {new_curl_path} --connect-timeout {timeout_sec} -L -o "{filename}" "{url}"'''
except:
log.warning("could not find new curl version, use default.")
log.debug(f"curl command: {command}")
success = run(command)
log.info(f"file downloaded: {filename} [{'ok' if success else 'fail'}]")
return success
@staticmethod
def file_exists_in_download_folder(filename):
full_name = os.path.join(machine.get_download_folder(), filename)
return os.path.exists(full_name)
@staticmethod
def run_process(command):
return run(command)
@staticmethod
def get_temp_folder():
if machine.temp_folder is None:
machine.temp_folder = tempfile.gettempdir()
return machine.temp_folder
@staticmethod
def set_temp_folder(temp_folder):
machine.temp_folder = temp_folder
log.info(f"temp folder set: [{machine.temp_folder}]")
@staticmethod
def get_collection_folder():
return os.path.join(machine.get_temp_folder(),COLLECTION_DIR)
@staticmethod
def copy_to_collection_folder(filename):
collection_folder = machine.get_collection_folder()
log.debug(f"os.path.exists(collection_folder) {os.path.exists(collection_folder)}")
if not os.path.exists(collection_folder) and not run(f"sudo mkdir {collection_folder}"):
log.error("cannot create temp directory")
return False
log.debug(f"os.path.exists(collection_folder) {collection_folder}, {os.path.exists(collection_folder)} after creation")
if not run(f"sudo cp '{filename}' '{collection_folder}'"):
log.error(f"cannot copy {filename} to collection folder")
return False
return True
@staticmethod
def copy_dir_to_collection_folder(folder_path, target_name):
collection_folder = machine.get_collection_folder()
if not os.path.exists(collection_folder) and not run(f"sudo mkdir {collection_folder}"):
log.error("cannot create temp directory")
return False
if not run(f"sudo cp -r '{folder_path}' '{collection_folder}/{target_name}'"):
log.error(f"cannot copy {folder_path} to collection folder")
return False
return True
@staticmethod
def move_file_to_target_folder(file_path, target_folder):
if not run(f"sudo mv '{file_path}' '{target_folder}'"):
log.error(f"cannot move {file_path} to {target_folder}")
return False
return True
@staticmethod
def get_download_folder():
if machine.download_folder is None:
platform = machine.get_platform()
if platform == 'macOS':
machine.download_folder = os.path.join("/","Users",machine.get_username(),"Downloads")
elif platform == 'Linux':
machine.download_folder = run_with_output("xdg-user-dir DOWNLOAD")
log.debug(f"Download folder: [{machine.download_folder}]")
return machine.download_folder
@staticmethod
def change_permissions(filename):
return run(f"sudo chmod 666 {filename}")
@staticmethod
def get_logs_dir():
platform = machine.get_platform()
if platform == 'macOS':
return '/Library/Logs/Microsoft/mdatp'
if platform == 'Linux':
return '/var/log/microsoft/mdatp'
raise Exception("unknown platform")
@staticmethod
def get_process_ids(proc_names, excluded_processes=[]):
pids = set()
for proc_name in proc_names:
command = f"ps aux | grep {proc_name} | grep -v grep"
for excluded_proc in excluded_processes:
command = command + f" | grep -v {excluded_proc}"
result = os.popen(command).read()
if result is None:
continue
lines = result.strip().split('\n')
for line in lines:
parts = line.split()
if len(parts)>1:
pids.add(parts[1])
return list(pids)
@staticmethod
def _check_duplicates_process(output):
pids = []
new_output = []
for process in output:
pid = process.split()[1]
if pid in pids:
continue
pids.append(pid)
new_output.append(process)
return new_output
@staticmethod
def get_process_info(process_names):
output = []
for proc_name in process_names:
process_data = os.popen(f"ps aux | grep {proc_name} | grep -v grep").read().strip().split("\n")
if process_data != None or process_data != '':
[output.append(process) for process in process_data if process not in output and process != None and process != '']
new_output = machine._check_duplicates_process(output)
return "\n".join(new_output)
@staticmethod
def unzip_package(package_name,timeout = 30):
end_time = time.time() + timeout
file_name = machine.get_package_filename(package_name)
if not file_name:
log.debug(f"package was not found in download folder [{package_name}]")
return None
download_folder = machine.get_download_folder()
if package_name != file_name:
package_name = file_name
unzip_output = run_with_output(f'unzip -o "{os.path.join(download_folder, package_name)}" -d {download_folder}', verbose=True)
log.info(f'unzip -o "{os.path.join(download_folder, package_name)}" -d {download_folder}')
while time.time() < end_time:
if unzip_output is not None:
break
wait(2, "wait for the unzip to finish")
script_name = re.findall("inflating: (.+?)(?:\n|$)", unzip_output)[0].strip()
log.debug(f"script name: {script_name}")
return script_name
@staticmethod
def get_package_filename(package_filename, timeout=15):
start_time = machine.get_start_time()
end_time = time.time() + timeout
while time.time() < end_time:
for filename in os.listdir(machine.get_download_folder()):
if ".crdownload" in filename:
continue
# take the package that was created after the test had started
file_create_time = os.path.getmtime(os.path.join(machine.get_download_folder(), filename))
if package_filename in filename and file_create_time > start_time:
return filename
wait(1, "wait for package to download")
return None
@staticmethod
def clean_downloads_folder():
run(f"rm -rf {os.path.join(machine.get_download_folder(), '*DefenderATP*')}")
@staticmethod
def package_downloaded(package_filename, timeout = 120):
return machine.get_package_filename(package_filename,timeout) is not None
@staticmethod
def delete_package_files(zip_package,script_name):
run(f"rm -rf {os.path.join(machine.get_download_folder(), zip_package)}")
run(f"rm -rf {os.path.join(machine.get_download_folder(), script_name)}")
@staticmethod
def delete_diy_files():
run("rm -rf ~/Downloads/*DIY*")
run("rm -rf ~/Downloads/__MACOSX")
@staticmethod
def _get_onboarding_script():
download_folder = machine.get_download_folder()
files = os.listdir(download_folder)
onboarding_script = None
for script in ONBOARDING_SCRIPTS:
if script in files:
onboarding_script = script
return onboarding_script
@staticmethod
def _get_last_line_from_logs(logs_dir, expression, get_all_lines=False):
temp_filename = os.path.join(machine.get_temp_folder(), "config_temp")
# read all enterprise logs into a single file
run(f"sudo cat {logs_dir}/microsoft_defender_enterprise* > {temp_filename}", False)
# read lines from combined log
output = run_with_output(f"cat {temp_filename}", verbose=False)
if output is None:
return None
lines = output.split('\n')
lines = [line for line in lines if expression in line]
# delete temp file
run(f"rm -rf {temp_filename}", False)
if len(lines) == 0:
return None
if get_all_lines:
return lines
return lines[-1]
@staticmethod
def get_crash_dump_dir():
platform = machine.get_platform()
if platform == 'macOS':
return '/Library/logs/DiagnosticReports'
if platform == 'Linux':
return '/var/crash'
raise Exception("unknown platform")
@staticmethod
def _parse_from_json(line, key):
#matches = re.findall(f'\"{key}\":\"(.+?)\"', line)
matches = re.findall(f'{key}(.*?)(?:,|$)', line)
if matches is None or len(matches) == 0:
return None
return matches[0]
@staticmethod
def query_installed_package(package_name):
command = None
curr_distro_likes = distro_info.like().split(' ')
curr_distro_likes.append(distro_info.id())
if [like for like in curr_distro_likes if like == "debian"]:
command = f"dpkg-query -W -f=\'\\{{\"Name\":\"${{binary:Package}}\", \"Version\": \"${{Version}}\", \"Status\": \"${{db:Status-Status}}\", \"Source\": \"${{Source}}\"}}\' {package_name}"
elif [like for like in curr_distro_likes if like in ["sles", "centos", "fedora", "rhel", "mariner", "azurelinux"]]:
command = f"rpm -qa --qf \'\\{{ \"Name\":\"%{{NAME}}\", \"Version\": \"%{{VERSION}}-%{{RELEASE}}\", \"Status\": \"installed\", \"Source\": \"%{{SOURCEPACKAGE}}\"\\}}\' {package_name}"
else:
log.info(f'Could not fetch package details for package {package_name}')
return package(package_name)
result = run_with_output(cmd=command, verbose=False)
try:
res_dict = json.loads(result)
is_installed = True if res_dict["Status"] == "installed" else False
return package(name = res_dict['Name'], version = res_dict["Version"], is_installed = is_installed, source = res_dict['Source'])
except:
return package(package_name)