File: //opt/microsoft/mdatp/tools/client_analyzer/python/mde_tools/support_tool.py
#!/usr/bin/python3
import argparse
import datetime
import os
import sys
import shutil
import typing
import zipfile
import tempfile
import stat
import sh
from .report import xml_report_root, xml_functions
from .diagnostic import diagnostic_functions, SystemMonitor
from .connectivity_test import perform_test
from .machine import os_details, machine
from .exclude import exclude
from .certinfocollection import certinfocollection
from .rate_limiter import rate_limiter
from .skip_faulty_rules import skip_faulty_rules
from .pii_disclaimer import present_disclaimer
from . import installation_report
from . import perf
from . import constants, logger
from .mdatp import mdatp
from .perf_trace import perf_trace
from .utils import command_exists
from .spike import observe_cpu_mem_spikes
# Add module directory to import paths array
from lxml import etree
from enum import Enum
from . import filesystem as fs
from .mdatp import mdatp
from . import SCRIPT_VERSION
from .utils import log, run, run_with_output
class PrintHelp(Enum):
NONE = 0
MAIN = 1
CONNECTIVITYTEST = 2
global log
def find_rootlogger_basefilename():
"""Finds the root logger base filename
"""
log_file = None
for h in log.__dict__['handlers']:
if h.__class__.__name__ == 'FileHandler':
log_file = h.baseFilename
break
elif h.__class__.__name__ == 'TimedRotatingFileHandler':
log_file = h.baseFilename
break
return log_file
def export_report_folder(files_to_copy: typing.Dict[str, typing.Union[str, typing.List[str]]], output_path: str):
print(files_to_copy)
# Create report folder
os.makedirs(output_path, exist_ok=True, mode=stat.S_IRUSR)
for path_in_dir, file_to_copy in files_to_copy.items():
dst_in_folder_path = os.path.join(output_path, path_in_dir)
# Create necessary dirs if not exists
os.makedirs(dst_in_folder_path, exist_ok=True)
if isinstance(file_to_copy, list):
for f in file_to_copy:
shutil.copy2(f, dst_in_folder_path)
else:
shutil.copy2(file_to_copy, dst_in_folder_path)
# Export log
baseFilename = find_rootlogger_basefilename()
shutil.copyfile(baseFilename, os.path.join(output_path, 'log.txt'))
if not constants.IS_COMPILED_AS_BINARY:
# Export XML
etree.ElementTree(xml_report_root).write(os.path.join(output_path, 'mde.xml'), pretty_print=True)
# Add events.xml file to directory
shutil.copyfile(constants.XML_EVENTS_PATH, os.path.join(output_path, constants.XML_FILENAME))
# Export HTML
xslt = etree.parse(constants.XSLT_REPORT_PATH)
with open(os.path.join(output_path, 'report.html'), 'wb') as writer:
writer.write(etree.tostring(etree.XSLT(xslt)(xml_report_root), pretty_print=True))
log.info(f'Folder created at: {output_path}')
def process_file(file_to_copy, path_in_zip, zip_writer):
if file_to_copy is None or not (os.path.isfile(file_to_copy) or os.path.isdir(file_to_copy)):
return
os.chmod(file_to_copy, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
zip_writer.write(file_to_copy, arcname=path_in_zip if os.path.basename(path_in_zip) else os.path.join(os.path.dirname(path_in_zip), os.path.basename(file_to_copy)))
def export_report_archive(files_and_data_to_copy: typing.Dict[str, typing.Union[str, typing.List[str]]], output_path: str):
print(f"[outfiles] => {files_and_data_to_copy}")
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zip_writer:
for path_in_zip, file_to_copy in files_and_data_to_copy.items():
if isinstance(file_to_copy, list):
for f in file_to_copy:
process_file(f, path_in_zip, zip_writer)
else:
# Save the file as the given name (key of the dictionary) or if only directory exists save it as the original filename
process_file(file_to_copy, path_in_zip, zip_writer)
# Export log
baseFilename = find_rootlogger_basefilename()
os.chmod(baseFilename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
zip_writer.write(baseFilename, arcname='log.txt')
# Export XML
zip_writer.writestr('mde.xml', etree.tostring(xml_report_root, pretty_print=True))
# Currently parsing XSLT crashing in our static binary
if not constants.IS_COMPILED_AS_BINARY and not os.environ.get("E2E_TEST", False):
# Add events.xml file to zip
os.chmod(constants.XML_EVENTS_PATH, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
zip_writer.write(constants.XML_EVENTS_PATH, arcname=constants.XML_FILENAME)
# Export HTML
xslt = etree.parse(constants.XSLT_REPORT_PATH)
zip_writer.writestr('report.html', etree.tostring(etree.XSLT(xslt)(xml_report_root), pretty_print=True))
os.chmod(output_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
log.info(f'Archive created at: {output_path}')
# clean up
for path_in_zip, file_to_copy in files_and_data_to_copy.items():
if not isinstance(file_to_copy, list):
if file_to_copy is not None and '/tmp/mde_support_' in file_to_copy:
dir_path = os.path.dirname(file_to_copy)
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
def collect_diagnostic(args):
log.info('[MDE Diagnostic]')
files_dict = {}
for func_name, func in diagnostic_functions.items():
try:
log.info(f' Collecting {func_name}')
func_result = func(args)
# Update files_dict only if function returned a non-empty valid dict
if isinstance(func_result, dict) and func_result:
log.info(f' Adding {", ".join(func_result.keys())} to report directory')
files_dict.update(func_result)
except Exception as e:
log.error(f" Diagnostics collection encountered an issue at function {func_name} - {str(e)}")
return files_dict
def generate_report_xml():
log.info('[Report Generator]')
for func_name, func in xml_functions.items():
try:
log.debug(f' Generating {func_name}')
func()
except Exception as e:
log.error(f" Report generator encountered an issue at function {func_name} - {str(e)}")
def log_tool_info():
log.info(f'XMDEClientAnalyzer Version: {SCRIPT_VERSION}')
def period(duration):
if duration.lower().endswith('d'):
return datetime.timedelta(days=int(duration[:-1]))
elif duration.lower().endswith('h'):
return datetime.timedelta(hours=int(duration[:-1]))
elif duration.lower().endswith('m'):
return datetime.timedelta(minutes=int(duration[:-1]))
raise argparse.ArgumentTypeError('Wrong time duration. Allowed xd(for days), xh(for hours), xm(for minutes)')
def delay_range_limited_int_type(arg):
try:
value = int(arg)
except ValueError:
raise argparse.ArgumentTypeError(f"Must be a base-10 integer: {arg}")
if value < 0 or value > 2880:
raise argparse.ArgumentTypeError(f"The delay argument must be between 1 and 2880 inclusive: {arg}")
return value
def get_parser():
parser = argparse.ArgumentParser(description='MDE Diagnostics Tool')
parser.add_argument('--output', '-o',
type=str,
help='Output path to export report')
parser.add_argument('--outdir',
type=str,
help='Directory where diagnostics file will be generated.')
parser.add_argument('--no-zip', '-nz',
action='store_true',
help='If set a directory will be created instead of an archive file.')
parser.add_argument('--force', '-f',
action='store_true',
help='Will overwrite if output directory exists.')
parser.add_argument('--diagnostic', '-d',
action='store_true',
help='Collect extensive machine diagnostic information.')
parser.add_argument('--skip-mdatp',
action='store_true',
help='Skip any mdatp command. Use this when the mdatp command is unresponsive.')
parser.add_argument('--bypass-disclaimer',
action='store_true',
help='Do not display disclaimer banner.')
parser.add_argument('--interactive', '-i',
action='store_true',
help='Interactive diagnostic,')
parser.add_argument('--delay', '-dd',
type=delay_range_limited_int_type,
help='Delay diagnostic by how many minutes (0~2880), use this to wait for more debug logs before it collects.')
parser.add_argument('--mdatp-log',
choices={'error', 'warning', 'info', 'debug', 'verbose', 'trace'},
help='Set MDATP log level. If you use interactive or delay mode, the log level will set to debug automatically, and reset after 48h.')
parser.add_argument('--max-log-size',
type=int,
help='Maximum log file size in MB before rotating(Will restart mdatp).')
parser.set_defaults(performance=False)
parser.set_defaults(installation=False)
parser.set_defaults(exclude=False)
parser.set_defaults(trace=False)
parser.set_defaults(ratelimit=False)
parser.set_defaults(skipfaultyrules=False)
parser.set_defaults(connectivitytest=False)
parser.set_defaults(certinfocollection=False)
parser.set_defaults(observespikes=False)
subparsers = parser.add_subparsers()
certinfocollection_subparser = subparsers.add_parser("certinfocollection", help='Collect cert information: Subject name and Hashes')
certinfocollection_subparser.add_argument(dest='target_file', help='target directory to store installed certs information')
perf_subparser = subparsers.add_parser("performance", help='Collect extensive machine performance tracing for analysis of a performance scenario that can be reproduced on demand')
perf_subparser.add_argument('--frequency', type=int, default=3000, help="profile at this frequency")
perf_subparser.add_argument('--length', type=int, default=10, help="length of time to collect (in seconds)")
installation_subparser = subparsers.add_parser("installation", help='Collect different installation/onboarding reports')
installation_subparser.add_argument('-d', '--distro', help="Check for distro support", action="store_true")
installation_subparser.add_argument('-m', '--min-requirement', help="Check for the system info against offical minimum requirements", action="store_true")
installation_subparser.add_argument('-e', '--external-dep', help="Check for externel package dependency", action="store_true")
installation_subparser.add_argument('-c', '--connectivity', help="Check for connectivity for services used by MDE", action="store_true")
installation_subparser.add_argument('-a', '--all', help="Run all checks", action="store_true")
installation_subparser.add_argument("-o", '--onboarding-script', type=str, help="Path to onboarding script")
installation_subparser.add_argument("-g", '--geo', type=str, help="Geo string to test <US|UK|EU|AU|CH|IN>")
exclude_subparser = subparsers.add_parser("exclude", help="Exclude specific process(es) from audit-d monitoring.")
exclude_subparser.add_argument("-a", "--arch", help="cpu architecture, used in arch specific syscalls. available: 32, 64. default: 64", type=str, metavar='<32/64>')
exclude_subparser.add_argument("-e", "--exe", help="exclude by executable name, i.e: bash", action='append', metavar='<executable>')
exclude_subparser.add_argument("-p", "--pid", help="exclude by process id, i.e: 911", type=int, action='append', metavar='<process id>')
exclude_subparser.add_argument("-d", "--dir", help="exclude by target path, i.e: /var/foo/bar", action='append', metavar='<directory>')
exclude_subparser.add_argument("-x", "--exe_dir", help="exclude by executable path and target path, i.e: /bin/bash /var/foo/bar", action='append', nargs=2, metavar=('<executable>','<directory>'))
exclude_subparser.add_argument("-q", "--queue", help="set dispatcher q_depth size", type=int, metavar='<q_size>')
exclude_subparser.add_argument("-r", "--remove", help="remove exclusion file", action='store_true')
exclude_subparser.add_argument("-s", "--stat", help="get statistics about common executables", action='store_true')
exclude_subparser.add_argument("-l", "--list", help="list auditd rules", action='store_true')
exclude_subparser.add_argument("-o", "--override", help="Override the existing auditd exclusion rules file for mdatp", action='store_true')
exclude_subparser.add_argument("-c", "--syscall", help="exclude all process of the given syscall", action='append', metavar='<syscall number>')
rate_limit_subparser = subparsers.add_parser("ratelimit", help="Set the rate limit for auditd events. Rate limit will update the limits for auditd events for all the applications using auditd, which could impact applications other than MDE.")
rate_limit_subparser.add_argument("-e", "--enable", help="enable/disable the rate limit with default values", type=str, metavar='<true/false>')
rate_limit_subparser.add_argument("-r", "--rate", help=argparse.SUPPRESS, type=int, metavar='<rate limit>')
skip_faulty_rules_subparser = subparsers.add_parser("skipfaultyrules", help="Continue loading rules in spite of an error. This summarizes the results of loading the rules. The exit code will not be success if any rule fails to load.")
skip_faulty_rules_subparser.add_argument("-e", "--enable",
help="enable/disable loading of rules in spite of an error.",
default='true',
choices={'true', 'false'})
tracing_subparser = subparsers.add_parser('trace',help='Use OS tracing facilities to record Defender performance traces.', epilog='On Linux, lttng needs to be installed')
tracing_subparser.add_argument('--length', default=500, help='Length of time to record the trace (in seconds).', type=int)
tracing_subparser.add_argument('--mask', default=18446744073709551615, help='Mask to select with event to trace. Defaults to all')
spikes_subparser = subparsers.add_parser("observespikes", help='Collect the process logs in case of spike or mdatp crash')
spikes_subparser.add_argument('--upload', action='store_true', help='Upload to azure storage account')
spikes_subparser.add_argument('--account-name', help='Azure storage account name')
spikes_subparser.add_argument('--account-key', help='Azure storage account key')
spikes_subparser.add_argument('--container-name', help='Azure storage container name')
spikes_subparser.add_argument('--duration', type=period, help='Monitor for duration(ex: 1d, 6h, 2m')
spikes_subparser.add_argument('--sampling-rate', type=int, default=5, help='Monitoring frequncy rate in seconds')
spikes_subparser.add_argument('--mem', type=int, default=250000, help='Memory threshold in KB')
spikes_subparser.add_argument('--cpu', type=int, default=50, help='CPU threshold in percentage')
connectivitytest_subparser = subparsers.add_parser("connectivitytest", help="Perform connectivity test for MDE")
connectivitytest_subparser.add_argument("-o", '--onboarding-script', type=str, help="Path to onboarding script")
connectivitytest_subparser.add_argument("-g", '--geo', type=str, help="Geo string to test <US|UK|EU|AU|CH|IN>")
perf_subparser.set_defaults(performance=True)
installation_subparser.set_defaults(installation=True)
exclude_subparser.set_defaults(exclude=True)
certinfocollection_subparser.set_defaults(certinfocollection=True)
rate_limit_subparser.set_defaults(ratelimit=True)
tracing_subparser.set_defaults(trace=True)
skip_faulty_rules_subparser.set_defaults(skipfaultyrules=True)
spikes_subparser.set_defaults(observespikes=True)
connectivitytest_subparser.set_defaults(connectivitytest=True)
return { 'parser': parser, 'connectivitytest_subparser': connectivitytest_subparser }
def mandatory_args(args):
if not (args.diagnostic or args.performance or args.installation or args.exclude or args.trace or args.ratelimit or args.skipfaultyrules or args.connectivitytest or args.observespikes or args.certinfocollection):
return PrintHelp.MAIN
if args.connectivitytest and not (args.geo or args.onboarding_script):
return PrintHelp.CONNECTIVITYTEST
PrintHelp.NONE
#exclude should not run with perf or diagnostics
def mutually_excluded(args):
return not (args.exclude and (args.performance or args.diagnostic))
#rate limit should not run with perf or diagnostics
def mutually_rate_limit(args):
return not (args.ratelimit and (args.performance or args.diagnostic))
#skip faulty rules should not run with perf or diagnostics
def mutually_skip_faulty_rules(args):
return not (args.skipfaultyrules and (args.performance or args.diagnostic))
#installation report should not run with diagnostics
def mutually_installation_report(args):
return not (args.installation and (args.performance or args.diagnostic))
def main():
parsers = get_parser()
parser = parsers['parser']
args = parser.parse_args()
help = mandatory_args(args)
if help == PrintHelp.MAIN:
parser.print_help()
parser.exit()
elif help == PrintHelp.CONNECTIVITYTEST:
parsers['connectivitytest_subparser'].print_help()
parser.exit()
if args.outdir:
os.environ['TMPDIR'] = args.outdir
global log
log = logger.set_logger(tempfile.mkstemp(prefix=f'mde_tool_log_{datetime.datetime.now(datetime.timezone.utc).strftime("%Y_%m_%d_%H_%M_%S")}', suffix='.log')[1], constants.LOGGER_NAME)
log_tool_info()
if args.skip_mdatp or 'SKIP_MDATP' in os.environ:
os.environ['SKIP_MDATP'] = "True"
log.warning("Skip mdatp command, as SKIP_MDATP is set.")
if not mutually_excluded(args):
parser.error('Performance/Diagnostics and Exclude should be used individually')
if not mutually_rate_limit(args):
parser.error('Performance/Diagnostics and Rate limit should be used individually')
if not mutually_skip_faulty_rules(args):
parser.error('Performance/Diagnostics and Skip Faulty Rules should be used individually')
if not mutually_installation_report(args):
parser.error('Performance/Diagnostics and Installation should be used individually')
if not args.output:
args.output = os.path.join(tempfile.gettempdir(), f'{datetime.datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}_output')
output_path = args.output if args.no_zip else args.output + '.zip'
if args.trace:
if os_details().platform == constants.LINUX_PLATFORM:
log.info("Linux performance tracing is not supported.")
sys.exit(0)
trace_path = perf_trace(args)
if args.no_zip:
export_report_folder(trace_path, output_path)
else:
export_report_archive(trace_path, output_path)
return
if args.connectivitytest:
perform_test(args.geo, args.onboarding_script)
sys.exit(0)
# Verify privileges:
if os.geteuid() != 0 and not args.trace:
parser.error('Please run this tool using `sudo`')
if args.exclude:
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('Exclude is currently supported only on Linux')
if args.certinfocollection:
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('certinfocollection is currently supported only on Linux')
if args.ratelimit:
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('Rate limit is currently supported only on Linux')
if args.skipfaultyrules:
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('Skipping faulty rules is currently supported only on Linux')
if args.installation:
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('Installation report is currently supported only on Linux')
if command_exists('mdatp'):
mdatp.lazy_fix_log_folder_issue()
if args.observespikes:
if args.upload and args.account_key is None and args.account_name is None and args.container_name is None:
parser.error("Required --account-name, --account-key, --container-name with --upload")
if os_details().platform != constants.LINUX_PLATFORM:
parser.error('Observing memory or cpu spikes is currently supported only on Linux')
try:
observe_cpu_mem_spikes(args)
except Exception as e:
log.error(f"Exception while observing cpu or memory spikes {e}")
return
if not args.bypass_disclaimer:
if not present_disclaimer():
return
log.info("Setup log level of MDE")
with mdatp.LogManager(args.interactive, args.delay, args.mdatp_log, args.max_log_size), SystemMonitor() as system_monitor:
system_monitor.info()
files_dict = dict()
if args.diagnostic or args.performance:
if os.path.exists(output_path) and not args.force:
parser.error('Chosen path already exists, please select non-existing path to export')
if args.performance:
#TODO: add it to checked prerequisites?
if os_details().platform == constants.LINUX_PLATFORM:
if not command_exists('perf'):
parser.error('perf is not installed')
files_dict["perf_benchmark.tar.gz"] = perf.capture_on_linux(secs=args.length,
frequency=args.frequency)
else:
files_dict["perf_benchmark.tar.gz"] = perf.capture_on_macos(secs=args.length,
frequency=args.frequency)
diag_files_dict = collect_diagnostic(args)
files_dict.update(diag_files_dict)
if os_details().platform == constants.LINUX_PLATFORM:
files_dict['installation_report.json'] = installation_report.capture_installation_report(args)
files_dict.update(system_monitor.stop())
# Generate Report XML
generate_report_xml()
# Export report
if args.no_zip:
export_report_folder(files_dict, output_path)
else:
export_report_archive(files_dict, output_path)
elif args.installation:
if os.path.exists(output_path) and not args.force:
parser.error('Chosen path already exists, please select non-existing path to export')
log.info('[Installation Report]')
files_dict['installation_report.json'] = installation_report.capture_installation_report(args)
if args.no_zip:
export_report_folder(files_dict, output_path)
else:
export_report_archive(files_dict, output_path)
elif args.ratelimit:
rate_limiter(args)
elif args.skipfaultyrules:
skip_faulty_rules(args)
elif args.certinfocollection:
certinfocollection(args)
else:
exclude(args)