From 789386b6d3a830a2acbf3ddf729424b09981c84d Mon Sep 17 00:00:00 2001 From: Itai Date: Sat, 14 Jun 2025 20:55:42 +0300 Subject: [PATCH 1/6] Switch to orjson for faster JSON parsing --- requirements.txt | 3 +- src/sysdiagnose/parsers/logarchive.py | 46 +++++++++++++++++++-------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/requirements.txt b/requirements.txt index d44923b3..99e2bbf7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ numpy==2.2.0 nska-deserialize==1.5.1 yara-python==4.5.1 # pycrashreport==1.2.4 -python-json-logger==3.2.0 \ No newline at end of file +python-json-logger==3.2.0 +orjson>=3.9.10 \ No newline at end of file diff --git a/src/sysdiagnose/parsers/logarchive.py b/src/sysdiagnose/parsers/logarchive.py index b5b76263..94671968 100644 --- a/src/sysdiagnose/parsers/logarchive.py +++ b/src/sysdiagnose/parsers/logarchive.py @@ -16,6 +16,7 @@ import sys import tempfile import shutil +import orjson # fast JSON library # --------------------------------------------# @@ -40,6 +41,26 @@ # LATER consider refactoring using yield to lower memory consumption +# Wrapper functions: fall back to stdlib if orjson is not available +try: + def fast_json_loads(data): + # orjson.loads expects bytes-like; encode if we get str + if isinstance(data, str): + data = data.encode() + return orjson.loads(data) + + def fast_json_dumps(obj) -> str: + # orjson.dumps returns bytes; decode to str so the rest of the code can keep using text mode + return orjson.dumps(obj).decode() +except Exception: # pragma: no cover + # Fallback – should very rarely trigger, but keeps the module usable without the extra dep. + import json as _json_std + + def fast_json_loads(data): + return _json_std.loads(data) + + def fast_json_dumps(obj) -> str: + return _json_std.dumps(obj) class LogarchiveParser(BaseParserInterface): description = 'Parsing system_logs.logarchive folder' @@ -61,7 +82,7 @@ def execute(self) -> list | dict: tmp_output_file = os.path.join(tmp_outpath.name, 'logarchive.tmp') LogarchiveParser.parse_all_to_file(self.get_log_files(), tmp_output_file) with open(tmp_output_file, 'r') as f: - return [json.loads(line) for line in f] + return [fast_json_loads(line) for line in f] except IndexError: return {'error': 'No system_logs.logarchive/ folder found in logs/ directory'} @@ -79,7 +100,7 @@ def get_result(self, force: bool = False): with open(self.output_file, 'r') as f: for line in f: try: - yield json.loads(line) + yield fast_json_loads(line) except json.decoder.JSONDecodeError: # last lines of the native logarchive.jsonl file continue else: @@ -135,7 +156,7 @@ def merge_files(temp_files: list, output_file: str): with open(current_temp_file['file'].name, 'r') as f_in: copy_over = False # store if we need to copy over, spares us of json.loads() every line when we know we should be continuing for line in f_in: - if not copy_over and json.loads(line)['time'] > prev_temp_file['last_timestamp']: + if not copy_over and fast_json_loads(line)['time'] > prev_temp_file['last_timestamp']: copy_over = True if copy_over: f_out.write(line) @@ -144,7 +165,7 @@ def merge_files(temp_files: list, output_file: str): def get_first_and_last_entries(output_file: str) -> tuple: with open(output_file, 'rb') as f: - first_entry = json.loads(f.readline().decode()) + first_entry = fast_json_loads(f.readline()) # discover last line efficiently f.seek(-2, os.SEEK_END) # Move the pointer to the second-to-last byte in the file # Move backwards until a newline character is found, or we hit the start of the file @@ -155,7 +176,7 @@ def get_first_and_last_entries(output_file: str) -> tuple: f.seek(-2, os.SEEK_CUR) # Move backwards # Read the last line - last_entry = json.loads(f.readline().decode()) + last_entry = fast_json_loads(f.readline()) return (first_entry, last_entry) @@ -202,7 +223,7 @@ def parse_folder_to_file(input_folder: str, output_file: str) -> bool: logger.exception('Error: No system_logs.logarchive/ folder found in logs/ directory') return False - def __convert_using_native_logparser(input_folder: str, output_file: str) -> list: + def __convert_using_native_logparser(input_folder: str, output_file: str) -> None: with open(output_file, 'w') as f_out: # output to stdout and not to a file as we need to convert the output to a unified format cmd_array = ['/usr/bin/log', 'show', input_folder, '--style', 'ndjson', '--info', '--debug', '--signpost'] @@ -210,8 +231,8 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis # this approach limits memory consumption for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) - f_out.write(json.dumps(entry_json) + '\n') + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) + f_out.write(fast_json_dumps(entry_json) + '\n') except json.JSONDecodeError as e: logger.warning(f"WARNING: error parsing JSON {line} - {e}", exc_info=True) except KeyError: @@ -221,11 +242,10 @@ def __convert_using_native_logparser(input_folder: str, output_file: str) -> lis logger.debug(f"Looks like we arrive to the end of the file: {line}") break - def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> list: + def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> None: with open(output_file, 'w') as f: for entry in LogarchiveParser.__convert_using_unifiedlogparser_generator(input_folder): - json.dump(entry, f) - f.write('\n') + f.write(fast_json_dumps(entry) + '\n') @DeprecationWarning def __convert_using_unifiedlogparser_save_file(input_folder: str, output_file: str): @@ -245,7 +265,7 @@ def __convert_using_unifiedlogparser_generator(input_folder: str): # this approach limits memory consumption for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(json.loads(line)) + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) yield entry_json except json.JSONDecodeError: pass @@ -276,7 +296,7 @@ def __execute_cmd_and_get_result(cmd_array: list, outputfile=None): if outputfile is None: for line in iter(process.stdout.readline, ''): try: - result.append(json.loads(line)) + result.append(fast_json_loads(line)) except Exception: result.append(line) elif outputfile == sys.stdout: From 1743ba16838cfeb9ab0fc967914fc81a6d8f7456 Mon Sep 17 00:00:00 2001 From: "roman.gorlach" Date: Wed, 18 Jun 2025 13:47:52 +0200 Subject: [PATCH 2/6] fix: apply linters --- src/sysdiagnose/analysers/ps_everywhere.py | 810 ++++++++++----------- 1 file changed, 372 insertions(+), 438 deletions(-) diff --git a/src/sysdiagnose/analysers/ps_everywhere.py b/src/sysdiagnose/analysers/ps_everywhere.py index 0802872d..94671968 100644 --- a/src/sysdiagnose/analysers/ps_everywhere.py +++ b/src/sysdiagnose/analysers/ps_everywhere.py @@ -1,447 +1,381 @@ #! /usr/bin/env python3 -from typing import Generator, Set, Optional -from sysdiagnose.utils.base import BaseAnalyserInterface, logger -from sysdiagnose.parsers.ps import PsParser -from sysdiagnose.parsers.psthread import PsThreadParser -from sysdiagnose.parsers.spindumpnosymbols import SpindumpNoSymbolsParser -from sysdiagnose.parsers.shutdownlogs import ShutdownLogsParser -from sysdiagnose.parsers.logarchive import LogarchiveParser -from sysdiagnose.parsers.logdata_statistics import LogDataStatisticsParser -from sysdiagnose.parsers.logdata_statistics_txt import LogDataStatisticsTxtParser -from sysdiagnose.parsers.uuid2path import UUID2PathParser -from sysdiagnose.parsers.taskinfo import TaskinfoParser -from sysdiagnose.parsers.remotectl_dumpstate import RemotectlDumpstateParser - - -class PsEverywhereAnalyser(BaseAnalyserInterface): - """ - Analyser that gathers process information from multiple sources - to build a comprehensive list of running processes across different system logs. - """ - - description = "List all processes we can find a bit everywhere." - format = "jsonl" +# For Python3 +# Script to parse system_logs.logarchive +# Author: david@autopsit.org +# +# +from collections.abc import Generator +from datetime import datetime, timezone +from sysdiagnose.utils.base import BaseParserInterface, logger +import glob +import json +import os +import platform +import subprocess +import sys +import tempfile +import shutil +import orjson # fast JSON library + +# --------------------------------------------# + +# On 2023-04-13: using ndjson instead of json to avoid parsing issues. +# Based on manpage: +# json JSON output. Event data is synthesized as an array of JSON dictionaries. +# +# ndjson Line-delimited JSON output. Event data is synthesized as JSON dictionaries, each emitted on a single line. +# A trailing record, identified by the inclusion of a 'finished' field, is emitted to indicate the end of events. +# +# cmd_parsing_osx = '/usr/bin/log show %s --style ndjson' # fastest and short version +# cmd_parsing_osx = '/usr/bin/log show %s --style json' # fastest and short version +# cmd_parsing_osx = '/usr/bin/log show %s --info --style json' # to enable debug, add --debug +# cmd_parsing_osx = '/usr/bin/log show %s --info --debug --style json' + +# Linux parsing relies on UnifiedLogReader: +# https://github.com/mandiant/macos-UnifiedLogs +# Follow instruction in the README.md in order to install it. +# TODO unifiedlog_parser is single threaded, either patch their code for multithreading support or do the magic here by parsing each file in a separate thread +cmd_parsing_linux_test = ['unifiedlog_iterator', '--help'] +# --------------------------------------------------------------------------- # + +# LATER consider refactoring using yield to lower memory consumption + +# Wrapper functions: fall back to stdlib if orjson is not available +try: + def fast_json_loads(data): + # orjson.loads expects bytes-like; encode if we get str + if isinstance(data, str): + data = data.encode() + return orjson.loads(data) + + def fast_json_dumps(obj) -> str: + # orjson.dumps returns bytes; decode to str so the rest of the code can keep using text mode + return orjson.dumps(obj).decode() +except Exception: # pragma: no cover + # Fallback – should very rarely trigger, but keeps the module usable without the extra dep. + import json as _json_std + + def fast_json_loads(data): + return _json_std.loads(data) + + def fast_json_dumps(obj) -> str: + return _json_std.dumps(obj) + +class LogarchiveParser(BaseParserInterface): + description = 'Parsing system_logs.logarchive folder' + format = 'jsonl' def __init__(self, config: dict, case_id: str): super().__init__(__file__, config, case_id) - self.all_ps: Set[str] = set() - - @staticmethod - def _strip_flags(process: str) -> str: - """ - Extracts the base command by removing everything after the first space. - - :param process: Full process command string. - :return: Command string without flags. - """ - process, *_ = process.partition(' ') - return process - - @staticmethod - def message_extract_binary(process: str, message: str) -> Optional[str | list[str]]: - """ - Extracts process_name from special messages: - 1. backboardd Signpost messages with process_name - 2. tccd process messages with binary_path - 3. '/kernel' process messages with app name mapping format 'App Name -> /path/to/app' - 4. configd SCDynamicStore client sessions showing connected processes - - :param process: Process name. - :param message: Log message potentially containing process information. - :return: Extracted process name, list of process names, or None if not found. - """ - # Case 1: Backboardd Signpost messages - if process == '/usr/libexec/backboardd' and 'Signpost' in message and 'process_name=' in message: - try: - # Find the process_name part in the message - process_name_start = message.find('process_name=') - if process_name_start != -1: - # Extract from after 'process_name=' to the next space or end of string - process_name_start += len('process_name=') - process_name_end = message.find(' ', process_name_start) - - if process_name_end == -1: # If no space after process_name - return message[process_name_start:] - else: - return message[process_name_start:process_name_end] - except Exception as e: - logger.debug(f"Error extracting process_name from backboardd: {e}") - - # Case 2: TCCD process messages - if process == '/System/Library/PrivateFrameworks/TCC.framework/Support/tccd' and 'binary_path=' in message: - try: - # Extract only the clean binary paths without additional context - binary_paths = [] - - # Find all occurrences of binary_path= in the message - start_pos = 0 - while True: - binary_path_start = message.find('binary_path=', start_pos) - if binary_path_start == -1: - break - - binary_path_start += len('binary_path=') - # Find the end of the path (comma, closing bracket, or end of string) - binary_path_end = None - for delimiter in [',', '}', ' access to', ' is checking']: - delimiter_pos = message.find(delimiter, binary_path_start) - if delimiter_pos != -1 and (binary_path_end is None or delimiter_pos < binary_path_end): - binary_path_end = delimiter_pos - - if binary_path_end is None: - path = message[binary_path_start:].strip() - else: - path = message[binary_path_start:binary_path_end].strip() - - # Skip paths with excessive information - if len(path) > 0 and path.startswith('/') and ' ' not in path: - binary_paths.append(path) - - # Move to position after the current binary_path - start_pos = binary_path_start + 1 - - # Return all valid binary paths - if binary_paths: - logger.debug(f"Extracted {len(binary_paths)} binary paths from tccd message") - return binary_paths if len(binary_paths) > 1 else binary_paths[0] - - except Exception as e: - logger.debug(f"Error extracting binary_path from tccd: {e}") - - # Case 3: /kernel process with App name mapping pattern "App Name -> /path/to/app" - if process == '/kernel' and ' -> ' in message and 'App Store Fast Path' in message: - try: - # Find the arrow mapping pattern - arrow_pos = message.find(' -> ') - if arrow_pos != -1: - path_start = arrow_pos + len(' -> ') - # Look for common path patterns - more flexible for kernel messages - if message[path_start:].startswith('/'): - # Find the end of the path (space or end of string) - path_end = message.find(' ', path_start) - if path_end == -1: # If no space after path - return message[path_start:] - else: - return message[path_start:path_end] - except Exception as e: - logger.debug(f"Error extracting app path from kernel mapping: {e}") - - # Case 4: configd SCDynamicStore client sessions - if process == '/usr/libexec/configd' and 'SCDynamicStore/client sessions' in message: - try: - # Process the list of connected clients from configd - process_paths = [] - lines = message.split('\n') - for line in lines: - line = line.strip() - if line.startswith('"') and '=' in line: - # Extract the client path from lines like ""/usr/sbin/mDNSResponder:null" = 1;" - client_path = line.split('"')[1] # Get the part between the first pair of quotes - if ':' in client_path: - # Extract the actual process path part (before the colon) - process_path = client_path.split(':')[0] - if process_path.startswith('/') or process_path.startswith('com.apple.'): - process_paths.append(process_path) - - # Return the list of process paths if any were found - if process_paths: - logger.debug(f"Extracted {len(process_paths)} process paths from configd message") - return process_paths - except Exception as e: - logger.debug(f"Error extracting client paths from configd SCDynamicStore: {e}") - - return None - - def execute(self) -> Generator[dict, None, None]: - """ - Executes all extraction methods dynamically, ensuring that each extracted process is unique. - - :yield: A dictionary containing process details from various sources. - """ - for func in dir(self): - if func.startswith(f"_{self.__class__.__name__}__extract_ps_"): - yield from getattr(self, func)() # Dynamically call extract methods - - def __extract_ps_base_file(self) -> Generator[dict, None, None]: - """ - Extracts process data from ps.txt. - - :return: A generator yielding dictionaries containing process details from ps.txt. - """ - entity_type = 'ps.txt' - try: - for p in PsParser(self.config, self.case_id).get_result(): - ps_event = { - 'process': self._strip_flags(p['command']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - if self.add_if_full_command_is_not_in_set(ps_event['process']): - yield ps_event - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type} file. {e}") - - def __extract_ps_thread_file(self) -> Generator[dict, None, None]: - """ - Extracts process data from psthread.txt. - - :return: A generator yielding dictionaries containing process details from psthread.txt. - """ - entity_type = 'psthread.txt' - try: - for p in PsThreadParser(self.config, self.case_id).get_result(): - ps_event = { - 'process': self._strip_flags(p['command']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - if self.add_if_full_command_is_not_in_set(ps_event['process']): - yield ps_event - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type} file. {e}") - - def __extract_ps_spindump_nosymbols_file(self) -> Generator[dict, None, None]: - """ - Extracts process data from spindump-nosymbols.txt. - - :return: A generator yielding dictionaries containing process and thread details from spindump-nosymbols.txt. - """ - entity_type = 'spindump-nosymbols.txt' - try: - for p in SpindumpNoSymbolsParser(self.config, self.case_id).get_result(): - if 'process' not in p: - continue - process_name = p.get('path', '/kernel' if p['process'] == 'kernel_task [0]' else p['process']) - - if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name)): - yield { - 'process': self._strip_flags(process_name), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - - for t in p['threads']: - try: - thread_name = f"{self._strip_flags(process_name)}::{t['thread_name']}" - if self.add_if_full_command_is_not_in_set(thread_name): - yield { - 'process': thread_name, - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except KeyError: - pass - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type} file. {e}") - - def __extract_ps_shutdownlogs(self) -> Generator[dict, None, None]: - """ - Extracts process data from shutdown logs. - - :return: A generator yielding dictionaries containing process details from shutdown logs. - """ - entity_type = 'shutdown.logs' - try: - for p in ShutdownLogsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['command'])): - yield { - 'process': self._strip_flags(p['command']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_logarchive(self) -> Generator[dict, None, None]: - """ - Extracts process data from logarchive. - - :return: A generator yielding dictionaries containing process details from logarchive. - """ - entity_type = 'log archive' + + def get_log_files(self) -> list: + log_folder_glob = '**/system_logs.logarchive/' + return glob.glob(os.path.join(self.case_data_folder, log_folder_glob), recursive=True) + + @DeprecationWarning + def execute(self) -> list | dict: + # OK, this is really inefficient as we're reading all to memory, writing it to a temporary file on disk, and re-reading it again + # but who cares, nobody uses this function anyway... try: - for p in LogarchiveParser(self.config, self.case_id).get_result(): - # First check if we can extract a binary from the message - if 'message' in p: - extracted_process = self.message_extract_binary(p['process'], p['message']) - if extracted_process: - # Handle the case where extracted_process is a list of paths - if isinstance(extracted_process, list): - for proc_path in extracted_process: - if self.add_if_full_command_is_not_in_set(self._strip_flags(proc_path)): - yield { - 'process': self._strip_flags(proc_path), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - else: - # Handle the case where it's a single string - if self.add_if_full_command_is_not_in_set(self._strip_flags(extracted_process)): - yield { - 'process': self._strip_flags(extracted_process), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - - # Process the original process name - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['process'])): - yield { - 'process': self._strip_flags(p['process']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_uuid2path(self) -> Generator[dict, None, None]: - """ - Extracts process data from UUID2PathParser. - - :return: A generator yielding process data from uuid2path. - """ - entity_type = 'uuid2path' + with tempfile.TemporaryDirectory() as tmp_outpath: + tmp_output_file = os.path.join(tmp_outpath.name, 'logarchive.tmp') + LogarchiveParser.parse_all_to_file(self.get_log_files(), tmp_output_file) + with open(tmp_output_file, 'r') as f: + return [fast_json_loads(line) for line in f] + except IndexError: + return {'error': 'No system_logs.logarchive/ folder found in logs/ directory'} + + def get_result(self, force: bool = False): + if force: + # force parsing + self.save_result(force) + + if not self._result: + if not self.output_exists(): + self.save_result() + + if self.output_exists(): + # load existing output + with open(self.output_file, 'r') as f: + for line in f: + try: + yield fast_json_loads(line) + except json.decoder.JSONDecodeError: # last lines of the native logarchive.jsonl file + continue + else: + # should never happen, as we never keep it in memory + for entry in self._result: + yield entry + + def save_result(self, force: bool = False, indent=None): + ''' + Save the result of the parsing operation to a file in the parser output folder + ''' + if not force and self._result is not None: + # the result was already computed, just save it now + super().save_result(force, indent) + else: + LogarchiveParser.parse_all_to_file(self.get_log_files(), self.output_file) + + def merge_files(temp_files: list, output_file: str): + for temp_file in temp_files: + first_entry, last_entry = LogarchiveParser.get_first_and_last_entries(temp_file['file'].name) + temp_file['first_timestamp'] = first_entry['time'] + temp_file['last_timestamp'] = last_entry['time'] + + # lowest first timestamp, second key highest last timestamp + temp_files.sort(key=lambda x: (x['first_timestamp'], -x['last_timestamp'])) + + # do the merging magic here + # Open output file, with r+, + # Look at next file, + # - if current_last < prev_last: continue # skip file + # - elif current_first > prev_last: copy over full file, prev_last=current_last + # - else: # need to seek to prev_last and copy over new data + # Continue with other files with the same logic. + prev_temp_file = temp_files[0] + # first copy over first file to self.output_file + shutil.copyfile(prev_temp_file['file'].name, output_file) + with open(output_file, 'a') as f_out: + i = 1 + while i < len(temp_files): + current_temp_file = temp_files[i] + if current_temp_file['last_timestamp'] < prev_temp_file['last_timestamp']: + # skip file as we already have all the data + # no need to update the prev_temp_file variable + pass + elif current_temp_file['first_timestamp'] > prev_temp_file['last_timestamp']: + # copy over the full file + with open(current_temp_file['file'].name, 'r') as f_in: + for line in f_in: + f_out.write(line) + prev_temp_file = current_temp_file + else: + # need to seek to prev_last and copy over new data + with open(current_temp_file['file'].name, 'r') as f_in: + copy_over = False # store if we need to copy over, spares us of json.loads() every line when we know we should be continuing + for line in f_in: + if not copy_over and fast_json_loads(line)['time'] > prev_temp_file['last_timestamp']: + copy_over = True + if copy_over: + f_out.write(line) + prev_temp_file = current_temp_file + i += 1 + + def get_first_and_last_entries(output_file: str) -> tuple: + with open(output_file, 'rb') as f: + first_entry = fast_json_loads(f.readline()) + # discover last line efficiently + f.seek(-2, os.SEEK_END) # Move the pointer to the second-to-last byte in the file + # Move backwards until a newline character is found, or we hit the start of the file + while f.tell() > 0: + char = f.read(1) + if char == b'\n': + break + f.seek(-2, os.SEEK_CUR) # Move backwards + + # Read the last line + last_entry = fast_json_loads(f.readline()) + + return (first_entry, last_entry) + + def parse_all_to_file(folders: list, output_file: str): + # no caching + # simple mode: only one folder + if len(folders) == 1: + LogarchiveParser.parse_folder_to_file(folders[0], output_file) + return + + # complex mode: multiple folders, need to merge multiple files + # for each of the log folders + # - parse it to a temporary file, keep track of the file reference or name + # - keep track of the first and last timestamp of each file + # - order the files, and if a file contains a subset of another one, skip it. + # this is a though one, as we may have partially overlapping timeframes, so we may need to re-assemble in a smart way. + # - once we know the order, bring the files together to the final single output file + + temp_files = [] try: - for p in UUID2PathParser(self.config, self.case_id).get_result().values(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p)): - yield { - 'process': self._strip_flags(p), - 'timestamp': None, - 'datetime': None, - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: - """ - Extracts process and thread information from TaskinfoParser. - - :return: A generator yielding process and thread information from taskinfo. - """ - entity_type = 'taskinfo.txt' + for folder in folders: + temp_file = tempfile.NamedTemporaryFile(delete=False) + LogarchiveParser.parse_folder_to_file(folder, temp_file.name) + temp_files.append({ + 'file': temp_file, + }) + + # merge files to the output file + LogarchiveParser.merge_files(temp_files, output_file) + + finally: + # close all temp files, ensuring they are deleted + for temp_file in temp_files: + os.remove(temp_file['file'].name) + + def parse_folder_to_file(input_folder: str, output_file: str) -> bool: try: - for p in TaskinfoParser(self.config, self.case_id).get_result(): - if 'name' not in p: - continue - - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['name'])): - yield { - 'process': self._strip_flags(p['name']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - - for t in p['threads']: + if (platform.system() == 'Darwin'): + LogarchiveParser.__convert_using_native_logparser(input_folder, output_file) + else: + LogarchiveParser.__convert_using_unifiedlogparser(input_folder, output_file) + return True + except IndexError: + logger.exception('Error: No system_logs.logarchive/ folder found in logs/ directory') + return False + + def __convert_using_native_logparser(input_folder: str, output_file: str) -> None: + with open(output_file, 'w') as f_out: + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['/usr/bin/log', 'show', input_folder, '--style', 'ndjson', '--info', '--debug', '--signpost'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): + try: + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) + f_out.write(fast_json_dumps(entry_json) + '\n') + except json.JSONDecodeError as e: + logger.warning(f"WARNING: error parsing JSON {line} - {e}", exc_info=True) + except KeyError: + # last line of log does not contain 'time' field, nor the rest of the data. + # so just ignore it and all the rest. + # last line looks like {'count':xyz, 'finished':1} + logger.debug(f"Looks like we arrive to the end of the file: {line}") + break + + def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> None: + with open(output_file, 'w') as f: + for entry in LogarchiveParser.__convert_using_unifiedlogparser_generator(input_folder): + f.write(fast_json_dumps(entry) + '\n') + + @DeprecationWarning + def __convert_using_unifiedlogparser_save_file(input_folder: str, output_file: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--mode', 'log-archive', '--input', input_folder, '--output', output_file, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + result = LogarchiveParser.__execute_cmd_and_get_result(cmd_array) + return result + + def __convert_using_unifiedlogparser_generator(input_folder: str): + logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') + # output to stdout and not to a file as we need to convert the output to a unified format + cmd_array = ['unifiedlog_iterator', '--mode', 'log-archive', '--input', input_folder, '--format', 'jsonl'] + # read each line, convert line by line and write the output directly to the new file + # this approach limits memory consumption + for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): + try: + entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) + yield entry_json + except json.JSONDecodeError: + pass + except KeyError: + pass + + def __execute_cmd_and_yield_result(cmd_array: list) -> Generator[dict, None, None]: + ''' + Return None if it failed or the result otherwise. + + ''' + with subprocess.Popen(cmd_array, stdout=subprocess.PIPE, universal_newlines=True) as process: + for line in iter(process.stdout.readline, ''): + yield line + + def __execute_cmd_and_get_result(cmd_array: list, outputfile=None): + ''' + Return None if it failed or the result otherwise. + + Outfile can have 3 values: + - None: no output except return value + - sys.stdout: print to stdout + - path to a file to write to + ''' + result = [] + + with subprocess.Popen(cmd_array, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) as process: + if outputfile is None: + for line in iter(process.stdout.readline, ''): try: - thread_name = f"{self._strip_flags(p['name'])}::{t['thread name']}" - if self.add_if_full_path_is_not_in_set(thread_name): - yield { - 'process': thread_name, - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except KeyError: - pass - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_remotectl_dumpstate(self) -> Generator[dict, None, None]: - """ - Extracts process data from RemotectlDumpstateParser. - - :return: A generator yielding process data from remotectl_dumpstate.txt. - """ - entity_type = 'remotectl_dumpstate.txt' - try: - remotectl_dumpstate_json = RemotectlDumpstateParser(self.config, self.case_id).get_result() - if remotectl_dumpstate_json: - for p in remotectl_dumpstate_json['Local device']['Services']: - if self.add_if_full_path_is_not_in_set(self._strip_flags(p)): - yield { - 'process': self._strip_flags(p), - 'timestamp': None, - 'datetime': None, - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_logdata_statistics(self) -> Generator[dict, None, None]: - """ - Extracts process data from logdata_statistics.jsonl. - - :return: A generator yielding process data from logdata_statistics.jsonl. - """ - entity_type = 'logdata.statistics.jsonl' - try: - for p in LogDataStatisticsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['process'])): - yield { - 'process': self._strip_flags(p['process']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def __extract_ps_logdata_statistics_txt(self) -> Generator[dict, None, None]: - """ - Extracts process data from logdata.statistics.txt. - - :return: A generator yielding process data from logdata.statistics.txt. - """ - entity_type = "logdata.statistics.txt" - - try: - for p in LogDataStatisticsTxtParser(self.config, self.case_id).get_result(): - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['process'])): - yield { - 'process': self._strip_flags(p['process']), - 'timestamp': p['timestamp'], - 'datetime': p['datetime'], - 'source': entity_type - } - except Exception as e: - logger.exception(f"ERROR while extracting {entity_type}. {e}") - - def add_if_full_path_is_not_in_set(self, name: str) -> bool: - """ - Ensures that a process path is unique before adding it to the shared set. - - :param name: Process path name - :return: True if the process was not in the set and was added, False otherwise. - """ - for item in self.all_ps: - if item.endswith(name): - return False - if item.split('::')[0].endswith(name): - return False - if '::' not in item and item.split(' ')[0].endswith(name): - return False # This covers cases with space-separated commands - self.all_ps.add(name) - return True - - def add_if_full_command_is_not_in_set(self, name: str) -> bool: - """ - Ensures that a process command is unique before adding it to the shared set. - - :param name: Process command name - :return: True if the process was not in the set and was added, False otherwise. - """ - for item in self.all_ps: - if item.startswith(name): - return False - self.all_ps.add(name) - return True + result.append(fast_json_loads(line)) + except Exception: + result.append(line) + elif outputfile == sys.stdout: + for line in iter(process.stdout.readline, ''): + print(line) + else: + with open(outputfile, 'w') as outfd: + for line in iter(process.stdout.readline, ''): + outfd.write(line) + result = f'Output written to {outputfile}' + + return result + + def convert_entry_to_unifiedlog_format(entry: dict) -> dict: + ''' + Convert the entry to unifiedlog format + ''' + # already in the Mandiant unifiedlog format + if 'event_type' in entry: + timestamp = LogarchiveParser.convert_unifiedlog_time_to_datetime(entry['time']) + entry['datetime'] = timestamp.isoformat(timespec='microseconds') + entry['timestamp'] = timestamp.timestamp() + return entry + ''' + jq '. |= keys' logarchive-native.json > native_keys.txt + sort native_keys.txt | uniq -c | sort -n > native_keys_sort_unique.txt + ''' + + mapper = { + 'creatorActivityID': 'activity_id', + 'messageType': 'log_type', + # 'source': '', # not present in the Mandiant format + # 'backtrace': '', # sub-dictionary + 'activityIdentifier': 'activity_id', + 'bootUUID': 'boot_uuid', # remove - in the UUID + 'category': 'category', + 'eventMessage': 'message', + 'eventType': 'event_type', + 'formatString': 'raw_message', + # 'machTimestamp': '', # not present in the Mandiant format + # 'parentActivityIdentifier': '', # not present in the Mandiant format + 'processID': 'pid', + 'processImagePath': 'process', + 'processImageUUID': 'process_uuid', # remove - in the UUID + 'senderImagePath': 'library', + 'senderImageUUID': 'library_uuid', # remove - in the UUID + # 'senderProgramCounter': '', # not present in the Mandiant format + 'subsystem': 'subsystem', + 'threadID': 'thread_id', + 'timestamp': 'time', # requires conversion + 'timezoneName': 'timezone_name', # ignore timezone as time and timestamp are correct + # 'traceID': '', # not present in the Mandiant format + 'userID': 'euid' + } + + new_entry = {} + for key, value in entry.items(): + if key in mapper: + new_key = mapper[key] + if 'uuid' in new_key: # remove - in UUID + new_entry[new_key] = value.replace('-', '') + else: + new_entry[new_key] = value + else: + # keep the non-matching entries + new_entry[key] = value + # convert time + timestamp = datetime.fromisoformat(new_entry['time']) + new_entry['datetime'] = timestamp.isoformat(timespec='microseconds') + new_entry['timestamp'] = timestamp.timestamp() + new_entry['time'] = new_entry['timestamp'] * 1000000000 + + return new_entry + + def convert_native_time_to_unifiedlog_format(time: str) -> int: + timestamp = datetime.fromisoformat(time) + return int(timestamp.timestamp() * 1000000000) + + def convert_unifiedlog_time_to_datetime(time: int) -> datetime: + # convert time to datetime object + timestamp = datetime.fromtimestamp(time / 1000000000, tz=timezone.utc) + return timestamp From cb7307e3ba3247ed3105c0b58fcf32f82550072d Mon Sep 17 00:00:00 2001 From: "roman.gorlach" Date: Wed, 18 Jun 2025 13:52:47 +0200 Subject: [PATCH 3/6] fix: apply linters --- src/sysdiagnose/analysers/ps_everywhere.py | 810 +++++++++++---------- 1 file changed, 438 insertions(+), 372 deletions(-) diff --git a/src/sysdiagnose/analysers/ps_everywhere.py b/src/sysdiagnose/analysers/ps_everywhere.py index 94671968..0802872d 100644 --- a/src/sysdiagnose/analysers/ps_everywhere.py +++ b/src/sysdiagnose/analysers/ps_everywhere.py @@ -1,381 +1,447 @@ #! /usr/bin/env python3 -# For Python3 -# Script to parse system_logs.logarchive -# Author: david@autopsit.org -# -# -from collections.abc import Generator -from datetime import datetime, timezone -from sysdiagnose.utils.base import BaseParserInterface, logger -import glob -import json -import os -import platform -import subprocess -import sys -import tempfile -import shutil -import orjson # fast JSON library - -# --------------------------------------------# - -# On 2023-04-13: using ndjson instead of json to avoid parsing issues. -# Based on manpage: -# json JSON output. Event data is synthesized as an array of JSON dictionaries. -# -# ndjson Line-delimited JSON output. Event data is synthesized as JSON dictionaries, each emitted on a single line. -# A trailing record, identified by the inclusion of a 'finished' field, is emitted to indicate the end of events. -# -# cmd_parsing_osx = '/usr/bin/log show %s --style ndjson' # fastest and short version -# cmd_parsing_osx = '/usr/bin/log show %s --style json' # fastest and short version -# cmd_parsing_osx = '/usr/bin/log show %s --info --style json' # to enable debug, add --debug -# cmd_parsing_osx = '/usr/bin/log show %s --info --debug --style json' - -# Linux parsing relies on UnifiedLogReader: -# https://github.com/mandiant/macos-UnifiedLogs -# Follow instruction in the README.md in order to install it. -# TODO unifiedlog_parser is single threaded, either patch their code for multithreading support or do the magic here by parsing each file in a separate thread -cmd_parsing_linux_test = ['unifiedlog_iterator', '--help'] -# --------------------------------------------------------------------------- # - -# LATER consider refactoring using yield to lower memory consumption - -# Wrapper functions: fall back to stdlib if orjson is not available -try: - def fast_json_loads(data): - # orjson.loads expects bytes-like; encode if we get str - if isinstance(data, str): - data = data.encode() - return orjson.loads(data) - - def fast_json_dumps(obj) -> str: - # orjson.dumps returns bytes; decode to str so the rest of the code can keep using text mode - return orjson.dumps(obj).decode() -except Exception: # pragma: no cover - # Fallback – should very rarely trigger, but keeps the module usable without the extra dep. - import json as _json_std - - def fast_json_loads(data): - return _json_std.loads(data) - - def fast_json_dumps(obj) -> str: - return _json_std.dumps(obj) - -class LogarchiveParser(BaseParserInterface): - description = 'Parsing system_logs.logarchive folder' - format = 'jsonl' +from typing import Generator, Set, Optional +from sysdiagnose.utils.base import BaseAnalyserInterface, logger +from sysdiagnose.parsers.ps import PsParser +from sysdiagnose.parsers.psthread import PsThreadParser +from sysdiagnose.parsers.spindumpnosymbols import SpindumpNoSymbolsParser +from sysdiagnose.parsers.shutdownlogs import ShutdownLogsParser +from sysdiagnose.parsers.logarchive import LogarchiveParser +from sysdiagnose.parsers.logdata_statistics import LogDataStatisticsParser +from sysdiagnose.parsers.logdata_statistics_txt import LogDataStatisticsTxtParser +from sysdiagnose.parsers.uuid2path import UUID2PathParser +from sysdiagnose.parsers.taskinfo import TaskinfoParser +from sysdiagnose.parsers.remotectl_dumpstate import RemotectlDumpstateParser + + +class PsEverywhereAnalyser(BaseAnalyserInterface): + """ + Analyser that gathers process information from multiple sources + to build a comprehensive list of running processes across different system logs. + """ + + description = "List all processes we can find a bit everywhere." + format = "jsonl" def __init__(self, config: dict, case_id: str): super().__init__(__file__, config, case_id) - - def get_log_files(self) -> list: - log_folder_glob = '**/system_logs.logarchive/' - return glob.glob(os.path.join(self.case_data_folder, log_folder_glob), recursive=True) - - @DeprecationWarning - def execute(self) -> list | dict: - # OK, this is really inefficient as we're reading all to memory, writing it to a temporary file on disk, and re-reading it again - # but who cares, nobody uses this function anyway... + self.all_ps: Set[str] = set() + + @staticmethod + def _strip_flags(process: str) -> str: + """ + Extracts the base command by removing everything after the first space. + + :param process: Full process command string. + :return: Command string without flags. + """ + process, *_ = process.partition(' ') + return process + + @staticmethod + def message_extract_binary(process: str, message: str) -> Optional[str | list[str]]: + """ + Extracts process_name from special messages: + 1. backboardd Signpost messages with process_name + 2. tccd process messages with binary_path + 3. '/kernel' process messages with app name mapping format 'App Name -> /path/to/app' + 4. configd SCDynamicStore client sessions showing connected processes + + :param process: Process name. + :param message: Log message potentially containing process information. + :return: Extracted process name, list of process names, or None if not found. + """ + # Case 1: Backboardd Signpost messages + if process == '/usr/libexec/backboardd' and 'Signpost' in message and 'process_name=' in message: + try: + # Find the process_name part in the message + process_name_start = message.find('process_name=') + if process_name_start != -1: + # Extract from after 'process_name=' to the next space or end of string + process_name_start += len('process_name=') + process_name_end = message.find(' ', process_name_start) + + if process_name_end == -1: # If no space after process_name + return message[process_name_start:] + else: + return message[process_name_start:process_name_end] + except Exception as e: + logger.debug(f"Error extracting process_name from backboardd: {e}") + + # Case 2: TCCD process messages + if process == '/System/Library/PrivateFrameworks/TCC.framework/Support/tccd' and 'binary_path=' in message: + try: + # Extract only the clean binary paths without additional context + binary_paths = [] + + # Find all occurrences of binary_path= in the message + start_pos = 0 + while True: + binary_path_start = message.find('binary_path=', start_pos) + if binary_path_start == -1: + break + + binary_path_start += len('binary_path=') + # Find the end of the path (comma, closing bracket, or end of string) + binary_path_end = None + for delimiter in [',', '}', ' access to', ' is checking']: + delimiter_pos = message.find(delimiter, binary_path_start) + if delimiter_pos != -1 and (binary_path_end is None or delimiter_pos < binary_path_end): + binary_path_end = delimiter_pos + + if binary_path_end is None: + path = message[binary_path_start:].strip() + else: + path = message[binary_path_start:binary_path_end].strip() + + # Skip paths with excessive information + if len(path) > 0 and path.startswith('/') and ' ' not in path: + binary_paths.append(path) + + # Move to position after the current binary_path + start_pos = binary_path_start + 1 + + # Return all valid binary paths + if binary_paths: + logger.debug(f"Extracted {len(binary_paths)} binary paths from tccd message") + return binary_paths if len(binary_paths) > 1 else binary_paths[0] + + except Exception as e: + logger.debug(f"Error extracting binary_path from tccd: {e}") + + # Case 3: /kernel process with App name mapping pattern "App Name -> /path/to/app" + if process == '/kernel' and ' -> ' in message and 'App Store Fast Path' in message: + try: + # Find the arrow mapping pattern + arrow_pos = message.find(' -> ') + if arrow_pos != -1: + path_start = arrow_pos + len(' -> ') + # Look for common path patterns - more flexible for kernel messages + if message[path_start:].startswith('/'): + # Find the end of the path (space or end of string) + path_end = message.find(' ', path_start) + if path_end == -1: # If no space after path + return message[path_start:] + else: + return message[path_start:path_end] + except Exception as e: + logger.debug(f"Error extracting app path from kernel mapping: {e}") + + # Case 4: configd SCDynamicStore client sessions + if process == '/usr/libexec/configd' and 'SCDynamicStore/client sessions' in message: + try: + # Process the list of connected clients from configd + process_paths = [] + lines = message.split('\n') + for line in lines: + line = line.strip() + if line.startswith('"') and '=' in line: + # Extract the client path from lines like ""/usr/sbin/mDNSResponder:null" = 1;" + client_path = line.split('"')[1] # Get the part between the first pair of quotes + if ':' in client_path: + # Extract the actual process path part (before the colon) + process_path = client_path.split(':')[0] + if process_path.startswith('/') or process_path.startswith('com.apple.'): + process_paths.append(process_path) + + # Return the list of process paths if any were found + if process_paths: + logger.debug(f"Extracted {len(process_paths)} process paths from configd message") + return process_paths + except Exception as e: + logger.debug(f"Error extracting client paths from configd SCDynamicStore: {e}") + + return None + + def execute(self) -> Generator[dict, None, None]: + """ + Executes all extraction methods dynamically, ensuring that each extracted process is unique. + + :yield: A dictionary containing process details from various sources. + """ + for func in dir(self): + if func.startswith(f"_{self.__class__.__name__}__extract_ps_"): + yield from getattr(self, func)() # Dynamically call extract methods + + def __extract_ps_base_file(self) -> Generator[dict, None, None]: + """ + Extracts process data from ps.txt. + + :return: A generator yielding dictionaries containing process details from ps.txt. + """ + entity_type = 'ps.txt' try: - with tempfile.TemporaryDirectory() as tmp_outpath: - tmp_output_file = os.path.join(tmp_outpath.name, 'logarchive.tmp') - LogarchiveParser.parse_all_to_file(self.get_log_files(), tmp_output_file) - with open(tmp_output_file, 'r') as f: - return [fast_json_loads(line) for line in f] - except IndexError: - return {'error': 'No system_logs.logarchive/ folder found in logs/ directory'} - - def get_result(self, force: bool = False): - if force: - # force parsing - self.save_result(force) - - if not self._result: - if not self.output_exists(): - self.save_result() - - if self.output_exists(): - # load existing output - with open(self.output_file, 'r') as f: - for line in f: - try: - yield fast_json_loads(line) - except json.decoder.JSONDecodeError: # last lines of the native logarchive.jsonl file - continue - else: - # should never happen, as we never keep it in memory - for entry in self._result: - yield entry - - def save_result(self, force: bool = False, indent=None): - ''' - Save the result of the parsing operation to a file in the parser output folder - ''' - if not force and self._result is not None: - # the result was already computed, just save it now - super().save_result(force, indent) - else: - LogarchiveParser.parse_all_to_file(self.get_log_files(), self.output_file) - - def merge_files(temp_files: list, output_file: str): - for temp_file in temp_files: - first_entry, last_entry = LogarchiveParser.get_first_and_last_entries(temp_file['file'].name) - temp_file['first_timestamp'] = first_entry['time'] - temp_file['last_timestamp'] = last_entry['time'] - - # lowest first timestamp, second key highest last timestamp - temp_files.sort(key=lambda x: (x['first_timestamp'], -x['last_timestamp'])) - - # do the merging magic here - # Open output file, with r+, - # Look at next file, - # - if current_last < prev_last: continue # skip file - # - elif current_first > prev_last: copy over full file, prev_last=current_last - # - else: # need to seek to prev_last and copy over new data - # Continue with other files with the same logic. - prev_temp_file = temp_files[0] - # first copy over first file to self.output_file - shutil.copyfile(prev_temp_file['file'].name, output_file) - with open(output_file, 'a') as f_out: - i = 1 - while i < len(temp_files): - current_temp_file = temp_files[i] - if current_temp_file['last_timestamp'] < prev_temp_file['last_timestamp']: - # skip file as we already have all the data - # no need to update the prev_temp_file variable - pass - elif current_temp_file['first_timestamp'] > prev_temp_file['last_timestamp']: - # copy over the full file - with open(current_temp_file['file'].name, 'r') as f_in: - for line in f_in: - f_out.write(line) - prev_temp_file = current_temp_file - else: - # need to seek to prev_last and copy over new data - with open(current_temp_file['file'].name, 'r') as f_in: - copy_over = False # store if we need to copy over, spares us of json.loads() every line when we know we should be continuing - for line in f_in: - if not copy_over and fast_json_loads(line)['time'] > prev_temp_file['last_timestamp']: - copy_over = True - if copy_over: - f_out.write(line) - prev_temp_file = current_temp_file - i += 1 - - def get_first_and_last_entries(output_file: str) -> tuple: - with open(output_file, 'rb') as f: - first_entry = fast_json_loads(f.readline()) - # discover last line efficiently - f.seek(-2, os.SEEK_END) # Move the pointer to the second-to-last byte in the file - # Move backwards until a newline character is found, or we hit the start of the file - while f.tell() > 0: - char = f.read(1) - if char == b'\n': - break - f.seek(-2, os.SEEK_CUR) # Move backwards - - # Read the last line - last_entry = fast_json_loads(f.readline()) - - return (first_entry, last_entry) - - def parse_all_to_file(folders: list, output_file: str): - # no caching - # simple mode: only one folder - if len(folders) == 1: - LogarchiveParser.parse_folder_to_file(folders[0], output_file) - return - - # complex mode: multiple folders, need to merge multiple files - # for each of the log folders - # - parse it to a temporary file, keep track of the file reference or name - # - keep track of the first and last timestamp of each file - # - order the files, and if a file contains a subset of another one, skip it. - # this is a though one, as we may have partially overlapping timeframes, so we may need to re-assemble in a smart way. - # - once we know the order, bring the files together to the final single output file - - temp_files = [] + for p in PsParser(self.config, self.case_id).get_result(): + ps_event = { + 'process': self._strip_flags(p['command']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + if self.add_if_full_command_is_not_in_set(ps_event['process']): + yield ps_event + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type} file. {e}") + + def __extract_ps_thread_file(self) -> Generator[dict, None, None]: + """ + Extracts process data from psthread.txt. + + :return: A generator yielding dictionaries containing process details from psthread.txt. + """ + entity_type = 'psthread.txt' try: - for folder in folders: - temp_file = tempfile.NamedTemporaryFile(delete=False) - LogarchiveParser.parse_folder_to_file(folder, temp_file.name) - temp_files.append({ - 'file': temp_file, - }) - - # merge files to the output file - LogarchiveParser.merge_files(temp_files, output_file) - - finally: - # close all temp files, ensuring they are deleted - for temp_file in temp_files: - os.remove(temp_file['file'].name) - - def parse_folder_to_file(input_folder: str, output_file: str) -> bool: + for p in PsThreadParser(self.config, self.case_id).get_result(): + ps_event = { + 'process': self._strip_flags(p['command']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + if self.add_if_full_command_is_not_in_set(ps_event['process']): + yield ps_event + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type} file. {e}") + + def __extract_ps_spindump_nosymbols_file(self) -> Generator[dict, None, None]: + """ + Extracts process data from spindump-nosymbols.txt. + + :return: A generator yielding dictionaries containing process and thread details from spindump-nosymbols.txt. + """ + entity_type = 'spindump-nosymbols.txt' try: - if (platform.system() == 'Darwin'): - LogarchiveParser.__convert_using_native_logparser(input_folder, output_file) - else: - LogarchiveParser.__convert_using_unifiedlogparser(input_folder, output_file) - return True - except IndexError: - logger.exception('Error: No system_logs.logarchive/ folder found in logs/ directory') - return False - - def __convert_using_native_logparser(input_folder: str, output_file: str) -> None: - with open(output_file, 'w') as f_out: - # output to stdout and not to a file as we need to convert the output to a unified format - cmd_array = ['/usr/bin/log', 'show', input_folder, '--style', 'ndjson', '--info', '--debug', '--signpost'] - # read each line, convert line by line and write the output directly to the new file - # this approach limits memory consumption - for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): - try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) - f_out.write(fast_json_dumps(entry_json) + '\n') - except json.JSONDecodeError as e: - logger.warning(f"WARNING: error parsing JSON {line} - {e}", exc_info=True) - except KeyError: - # last line of log does not contain 'time' field, nor the rest of the data. - # so just ignore it and all the rest. - # last line looks like {'count':xyz, 'finished':1} - logger.debug(f"Looks like we arrive to the end of the file: {line}") - break - - def __convert_using_unifiedlogparser(input_folder: str, output_file: str) -> None: - with open(output_file, 'w') as f: - for entry in LogarchiveParser.__convert_using_unifiedlogparser_generator(input_folder): - f.write(fast_json_dumps(entry) + '\n') - - @DeprecationWarning - def __convert_using_unifiedlogparser_save_file(input_folder: str, output_file: str): - logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') - # output to stdout and not to a file as we need to convert the output to a unified format - cmd_array = ['unifiedlog_iterator', '--mode', 'log-archive', '--input', input_folder, '--output', output_file, '--format', 'jsonl'] - # read each line, convert line by line and write the output directly to the new file - # this approach limits memory consumption - result = LogarchiveParser.__execute_cmd_and_get_result(cmd_array) - return result - - def __convert_using_unifiedlogparser_generator(input_folder: str): - logger.warning('WARNING: using Mandiant UnifiedLogReader to parse logs, results will be less reliable than on OS X') - # output to stdout and not to a file as we need to convert the output to a unified format - cmd_array = ['unifiedlog_iterator', '--mode', 'log-archive', '--input', input_folder, '--format', 'jsonl'] - # read each line, convert line by line and write the output directly to the new file - # this approach limits memory consumption - for line in LogarchiveParser.__execute_cmd_and_yield_result(cmd_array): - try: - entry_json = LogarchiveParser.convert_entry_to_unifiedlog_format(fast_json_loads(line)) - yield entry_json - except json.JSONDecodeError: - pass - except KeyError: - pass - - def __execute_cmd_and_yield_result(cmd_array: list) -> Generator[dict, None, None]: - ''' - Return None if it failed or the result otherwise. - - ''' - with subprocess.Popen(cmd_array, stdout=subprocess.PIPE, universal_newlines=True) as process: - for line in iter(process.stdout.readline, ''): - yield line - - def __execute_cmd_and_get_result(cmd_array: list, outputfile=None): - ''' - Return None if it failed or the result otherwise. - - Outfile can have 3 values: - - None: no output except return value - - sys.stdout: print to stdout - - path to a file to write to - ''' - result = [] - - with subprocess.Popen(cmd_array, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) as process: - if outputfile is None: - for line in iter(process.stdout.readline, ''): + for p in SpindumpNoSymbolsParser(self.config, self.case_id).get_result(): + if 'process' not in p: + continue + process_name = p.get('path', '/kernel' if p['process'] == 'kernel_task [0]' else p['process']) + + if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name)): + yield { + 'process': self._strip_flags(process_name), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + + for t in p['threads']: + try: + thread_name = f"{self._strip_flags(process_name)}::{t['thread_name']}" + if self.add_if_full_command_is_not_in_set(thread_name): + yield { + 'process': thread_name, + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except KeyError: + pass + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type} file. {e}") + + def __extract_ps_shutdownlogs(self) -> Generator[dict, None, None]: + """ + Extracts process data from shutdown logs. + + :return: A generator yielding dictionaries containing process details from shutdown logs. + """ + entity_type = 'shutdown.logs' + try: + for p in ShutdownLogsParser(self.config, self.case_id).get_result(): + if self.add_if_full_command_is_not_in_set(self._strip_flags(p['command'])): + yield { + 'process': self._strip_flags(p['command']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_logarchive(self) -> Generator[dict, None, None]: + """ + Extracts process data from logarchive. + + :return: A generator yielding dictionaries containing process details from logarchive. + """ + entity_type = 'log archive' + try: + for p in LogarchiveParser(self.config, self.case_id).get_result(): + # First check if we can extract a binary from the message + if 'message' in p: + extracted_process = self.message_extract_binary(p['process'], p['message']) + if extracted_process: + # Handle the case where extracted_process is a list of paths + if isinstance(extracted_process, list): + for proc_path in extracted_process: + if self.add_if_full_command_is_not_in_set(self._strip_flags(proc_path)): + yield { + 'process': self._strip_flags(proc_path), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + else: + # Handle the case where it's a single string + if self.add_if_full_command_is_not_in_set(self._strip_flags(extracted_process)): + yield { + 'process': self._strip_flags(extracted_process), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + + # Process the original process name + if self.add_if_full_command_is_not_in_set(self._strip_flags(p['process'])): + yield { + 'process': self._strip_flags(p['process']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_uuid2path(self) -> Generator[dict, None, None]: + """ + Extracts process data from UUID2PathParser. + + :return: A generator yielding process data from uuid2path. + """ + entity_type = 'uuid2path' + try: + for p in UUID2PathParser(self.config, self.case_id).get_result().values(): + if self.add_if_full_command_is_not_in_set(self._strip_flags(p)): + yield { + 'process': self._strip_flags(p), + 'timestamp': None, + 'datetime': None, + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: + """ + Extracts process and thread information from TaskinfoParser. + + :return: A generator yielding process and thread information from taskinfo. + """ + entity_type = 'taskinfo.txt' + try: + for p in TaskinfoParser(self.config, self.case_id).get_result(): + if 'name' not in p: + continue + + if self.add_if_full_path_is_not_in_set(self._strip_flags(p['name'])): + yield { + 'process': self._strip_flags(p['name']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + + for t in p['threads']: try: - result.append(fast_json_loads(line)) - except Exception: - result.append(line) - elif outputfile == sys.stdout: - for line in iter(process.stdout.readline, ''): - print(line) - else: - with open(outputfile, 'w') as outfd: - for line in iter(process.stdout.readline, ''): - outfd.write(line) - result = f'Output written to {outputfile}' - - return result - - def convert_entry_to_unifiedlog_format(entry: dict) -> dict: - ''' - Convert the entry to unifiedlog format - ''' - # already in the Mandiant unifiedlog format - if 'event_type' in entry: - timestamp = LogarchiveParser.convert_unifiedlog_time_to_datetime(entry['time']) - entry['datetime'] = timestamp.isoformat(timespec='microseconds') - entry['timestamp'] = timestamp.timestamp() - return entry - ''' - jq '. |= keys' logarchive-native.json > native_keys.txt - sort native_keys.txt | uniq -c | sort -n > native_keys_sort_unique.txt - ''' - - mapper = { - 'creatorActivityID': 'activity_id', - 'messageType': 'log_type', - # 'source': '', # not present in the Mandiant format - # 'backtrace': '', # sub-dictionary - 'activityIdentifier': 'activity_id', - 'bootUUID': 'boot_uuid', # remove - in the UUID - 'category': 'category', - 'eventMessage': 'message', - 'eventType': 'event_type', - 'formatString': 'raw_message', - # 'machTimestamp': '', # not present in the Mandiant format - # 'parentActivityIdentifier': '', # not present in the Mandiant format - 'processID': 'pid', - 'processImagePath': 'process', - 'processImageUUID': 'process_uuid', # remove - in the UUID - 'senderImagePath': 'library', - 'senderImageUUID': 'library_uuid', # remove - in the UUID - # 'senderProgramCounter': '', # not present in the Mandiant format - 'subsystem': 'subsystem', - 'threadID': 'thread_id', - 'timestamp': 'time', # requires conversion - 'timezoneName': 'timezone_name', # ignore timezone as time and timestamp are correct - # 'traceID': '', # not present in the Mandiant format - 'userID': 'euid' - } - - new_entry = {} - for key, value in entry.items(): - if key in mapper: - new_key = mapper[key] - if 'uuid' in new_key: # remove - in UUID - new_entry[new_key] = value.replace('-', '') - else: - new_entry[new_key] = value - else: - # keep the non-matching entries - new_entry[key] = value - # convert time - timestamp = datetime.fromisoformat(new_entry['time']) - new_entry['datetime'] = timestamp.isoformat(timespec='microseconds') - new_entry['timestamp'] = timestamp.timestamp() - new_entry['time'] = new_entry['timestamp'] * 1000000000 - - return new_entry - - def convert_native_time_to_unifiedlog_format(time: str) -> int: - timestamp = datetime.fromisoformat(time) - return int(timestamp.timestamp() * 1000000000) - - def convert_unifiedlog_time_to_datetime(time: int) -> datetime: - # convert time to datetime object - timestamp = datetime.fromtimestamp(time / 1000000000, tz=timezone.utc) - return timestamp + thread_name = f"{self._strip_flags(p['name'])}::{t['thread name']}" + if self.add_if_full_path_is_not_in_set(thread_name): + yield { + 'process': thread_name, + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except KeyError: + pass + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_remotectl_dumpstate(self) -> Generator[dict, None, None]: + """ + Extracts process data from RemotectlDumpstateParser. + + :return: A generator yielding process data from remotectl_dumpstate.txt. + """ + entity_type = 'remotectl_dumpstate.txt' + try: + remotectl_dumpstate_json = RemotectlDumpstateParser(self.config, self.case_id).get_result() + if remotectl_dumpstate_json: + for p in remotectl_dumpstate_json['Local device']['Services']: + if self.add_if_full_path_is_not_in_set(self._strip_flags(p)): + yield { + 'process': self._strip_flags(p), + 'timestamp': None, + 'datetime': None, + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_logdata_statistics(self) -> Generator[dict, None, None]: + """ + Extracts process data from logdata_statistics.jsonl. + + :return: A generator yielding process data from logdata_statistics.jsonl. + """ + entity_type = 'logdata.statistics.jsonl' + try: + for p in LogDataStatisticsParser(self.config, self.case_id).get_result(): + if self.add_if_full_command_is_not_in_set(self._strip_flags(p['process'])): + yield { + 'process': self._strip_flags(p['process']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def __extract_ps_logdata_statistics_txt(self) -> Generator[dict, None, None]: + """ + Extracts process data from logdata.statistics.txt. + + :return: A generator yielding process data from logdata.statistics.txt. + """ + entity_type = "logdata.statistics.txt" + + try: + for p in LogDataStatisticsTxtParser(self.config, self.case_id).get_result(): + if self.add_if_full_path_is_not_in_set(self._strip_flags(p['process'])): + yield { + 'process': self._strip_flags(p['process']), + 'timestamp': p['timestamp'], + 'datetime': p['datetime'], + 'source': entity_type + } + except Exception as e: + logger.exception(f"ERROR while extracting {entity_type}. {e}") + + def add_if_full_path_is_not_in_set(self, name: str) -> bool: + """ + Ensures that a process path is unique before adding it to the shared set. + + :param name: Process path name + :return: True if the process was not in the set and was added, False otherwise. + """ + for item in self.all_ps: + if item.endswith(name): + return False + if item.split('::')[0].endswith(name): + return False + if '::' not in item and item.split(' ')[0].endswith(name): + return False # This covers cases with space-separated commands + self.all_ps.add(name) + return True + + def add_if_full_command_is_not_in_set(self, name: str) -> bool: + """ + Ensures that a process command is unique before adding it to the shared set. + + :param name: Process command name + :return: True if the process was not in the set and was added, False otherwise. + """ + for item in self.all_ps: + if item.startswith(name): + return False + self.all_ps.add(name) + return True From 364249ac242dc316d4f288fc90d576db391c8d82 Mon Sep 17 00:00:00 2001 From: "roman.gorlach" Date: Wed, 18 Jun 2025 14:52:12 +0200 Subject: [PATCH 4/6] fix: Resolve merge conflicts --- pyproject.toml | 10 +++++++--- requirements.txt | 12 ------------ src/sysdiagnose/parsers/logarchive.py | 1 + 3 files changed, 8 insertions(+), 15 deletions(-) delete mode 100644 requirements.txt diff --git a/pyproject.toml b/pyproject.toml index e0bfa205..f432bcc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,10 +27,14 @@ dependencies = [ "python-dateutil==2.9.0.post0", "gpxpy==1.6.2", "pandas==2.2.3", - "numpy==2.2.0", + "numpy==2.2.5", "nska-deserialize==1.5.1", - "yara-python==4.5.1", - "python-json-logger==3.2.0" + "yara-python==4.5.2", + "python-json-logger==3.3.0", + "python-magic==0.4.27", + "jinja2==3.1.6", + "matplotlib==3.10.1", + "orjson>=3.9.10" ] [project.scripts] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 99e2bbf7..00000000 --- a/requirements.txt +++ /dev/null @@ -1,12 +0,0 @@ -docopt==0.6.2 -graphviz==0.20.3 -tabulate==0.9.0 -python-dateutil==2.9.0.post0 -gpxpy==1.6.2 -pandas==2.2.3 -numpy==2.2.0 -nska-deserialize==1.5.1 -yara-python==4.5.1 -# pycrashreport==1.2.4 -python-json-logger==3.2.0 -orjson>=3.9.10 \ No newline at end of file diff --git a/src/sysdiagnose/parsers/logarchive.py b/src/sysdiagnose/parsers/logarchive.py index 94671968..a667b915 100644 --- a/src/sysdiagnose/parsers/logarchive.py +++ b/src/sysdiagnose/parsers/logarchive.py @@ -16,6 +16,7 @@ import sys import tempfile import shutil +import threading import orjson # fast JSON library # --------------------------------------------# From 5dda650f9f11c296326660fbce0269b0e03a1754 Mon Sep 17 00:00:00 2001 From: Itai Date: Thu, 14 Aug 2025 14:30:33 +0300 Subject: [PATCH 5/6] feat: enhance ps_everywhere with UID tracking and fix spindump hardcoded UID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Port UID tracking to Event dataclass structure from upstream - Store UID/euid/auid in Event.data field instead of custom format - Update all parsers to use Event dataclass with UID data - Fix hardcoded uid=501 in spindumpnosymbols.py - now extracts actual UID from raw data - Add extract_euid_from_tccd_message() for better TCCD log parsing - Improve process uniqueness checking with (process, uid) tuples - Maintains backward compatibility when UID is None 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/sysdiagnose/analysers/ps_everywhere.py | 483 ++++++++++--------- src/sysdiagnose/parsers/spindumpnosymbols.py | 9 +- 2 files changed, 262 insertions(+), 230 deletions(-) diff --git a/src/sysdiagnose/analysers/ps_everywhere.py b/src/sysdiagnose/analysers/ps_everywhere.py index edaca35b..06d1851a 100644 --- a/src/sysdiagnose/analysers/ps_everywhere.py +++ b/src/sysdiagnose/analysers/ps_everywhere.py @@ -1,8 +1,7 @@ #! /usr/bin/env python3 from datetime import datetime -from typing import Generator, Set, Optional -from sysdiagnose.parsers import ps +from typing import Generator, Set, Optional, Tuple from sysdiagnose.utils.base import BaseAnalyserInterface, SysdiagnoseConfig, logger, Event from sysdiagnose.parsers.ps import PsParser from sysdiagnose.parsers.psthread import PsThreadParser @@ -22,6 +21,7 @@ class PsEverywhereAnalyser(BaseAnalyserInterface): to build a comprehensive list of running processes across different system logs. The timestamp is 'a' time the process was seen in the log, without being specifically the first or last seen. + This version includes UID tracking for better process uniqueness detection. """ description = "List all processes we can find a bit everywhere." @@ -29,7 +29,7 @@ class PsEverywhereAnalyser(BaseAnalyserInterface): def __init__(self, config: SysdiagnoseConfig, case_id: str): super().__init__(__file__, config, case_id) - self.all_ps: Set[str] = set() + self.all_ps: Set[Tuple[str, Optional[int]]] = set() # Stores tuples of (process, uid/euid) @staticmethod def _strip_flags(process: str) -> str: @@ -43,119 +43,40 @@ def _strip_flags(process: str) -> str: return process @staticmethod - def message_extract_binary(process: str, message: str) -> Optional[str | list[str]]: + def extract_euid_from_tccd_message(message: str, binary_path: str) -> Optional[int]: """ - Extracts process_name from special messages: - 1. backboardd Signpost messages with process_name - 2. tccd process messages with binary_path - 3. '/kernel' process messages with app name mapping format 'App Name -> /path/to/app' - 4. configd SCDynamicStore client sessions showing connected processes - - :param process: Process name. - :param message: Log message potentially containing process information. - :return: Extracted process name, list of process names, or None if not found. + Extracts the euid for a specific binary_path from a tccd message. + + :param message: Log message containing process information with euid + :param binary_path: The specific binary path to extract euid for + :return: The euid as an integer, or None if not found """ - # Case 1: Backboardd Signpost messages - if process == '/usr/libexec/backboardd' and 'Signpost' in message and 'process_name=' in message: - try: - # Find the process_name part in the message - process_name_start = message.find('process_name=') - if process_name_start != -1: - # Extract from after 'process_name=' to the next space or end of string - process_name_start += len('process_name=') - process_name_end = message.find(' ', process_name_start) - - if process_name_end == -1: # If no space after process_name - return message[process_name_start:] - else: - return message[process_name_start:process_name_end] - except Exception as e: - logger.debug(f"Error extracting process_name from backboardd: {e}") - - # Case 2: TCCD process messages - if process == '/System/Library/PrivateFrameworks/TCC.framework/Support/tccd' and 'binary_path=' in message: - try: - # Extract only the clean binary paths without additional context - binary_paths = [] - - # Find all occurrences of binary_path= in the message - start_pos = 0 - while True: - binary_path_start = message.find('binary_path=', start_pos) - if binary_path_start == -1: - break - - binary_path_start += len('binary_path=') - # Find the end of the path (comma, closing bracket, or end of string) - binary_path_end = None - for delimiter in [',', '}', ' access to', ' is checking']: - delimiter_pos = message.find(delimiter, binary_path_start) - if delimiter_pos != -1 and (binary_path_end is None or delimiter_pos < binary_path_end): - binary_path_end = delimiter_pos - - if binary_path_end is None: - path = message[binary_path_start:].strip() - else: - path = message[binary_path_start:binary_path_end].strip() - - # Skip paths with excessive information - if len(path) > 0 and path.startswith('/') and ' ' not in path: - binary_paths.append(path) - - # Move to position after the current binary_path - start_pos = binary_path_start + 1 - - # Return all valid binary paths - if binary_paths: - logger.debug(f"Extracted {len(binary_paths)} binary paths from tccd message") - return binary_paths if len(binary_paths) > 1 else binary_paths[0] - - except Exception as e: - logger.debug(f"Error extracting binary_path from tccd: {e}") - - # Case 3: /kernel process with App name mapping pattern "App Name -> /path/to/app" - if process == '/kernel' and ' -> ' in message and 'App Store Fast Path' in message: - try: - # Find the arrow mapping pattern - arrow_pos = message.find(' -> ') - if arrow_pos != -1: - path_start = arrow_pos + len(' -> ') - # Look for common path patterns - more flexible for kernel messages - if message[path_start:].startswith('/'): - # Find the end of the path (space or end of string) - path_end = message.find(' ', path_start) - if path_end == -1: # If no space after path - return message[path_start:] - else: - return message[path_start:path_end] - except Exception as e: - logger.debug(f"Error extracting app path from kernel mapping: {e}") - - # Case 4: configd SCDynamicStore client sessions - if process == '/usr/libexec/configd' and 'SCDynamicStore/client sessions' in message: - try: - # Process the list of connected clients from configd - process_paths = [] - lines = message.split('\n') - for line in lines: - line = line.strip() - if line.startswith('"') and '=' in line: - # Extract the client path from lines like ""/usr/sbin/mDNSResponder:null" = 1;" - client_path = line.split('"')[1] # Get the part between the first pair of quotes - if ':' in client_path: - # Extract the actual process path part (before the colon) - process_path = client_path.split(':')[0] - if process_path.startswith('/') or process_path.startswith('com.apple.'): - process_paths.append(process_path) - - # Return the list of process paths if any were found - if process_paths: - logger.debug(f"Extracted {len(process_paths)} process paths from configd message") - return process_paths - except Exception as e: - logger.debug(f"Error extracting client paths from configd SCDynamicStore: {e}") - - return None + try: + # Look for pattern: euid=XXX before binary_path=/path/to/binary + search_pattern = f'binary_path={binary_path}' + start_pos = message.find(search_pattern) + if start_pos == -1: + return None + + # Get the substring before binary_path and find the last euid= + before_binary = message[:start_pos] + euid_pos = before_binary.rfind('euid=') + if euid_pos == -1: + return None + + # Extract the euid value + euid_start = euid_pos + len('euid=') + euid_end = message.find(',', euid_start) + if euid_end == -1: + euid_end = message.find(' ', euid_start) + if euid_end == -1: + euid_end = message.find('}', euid_start) + + euid_str = message[euid_start:euid_end].strip() if euid_end != -1 else message[euid_start:].strip() + return int(euid_str) + except (ValueError, AttributeError) as e: + logger.debug(f"Error extracting euid for binary {binary_path}: {e}") + return None def execute(self) -> Generator[dict, None, None]: """ @@ -170,78 +91,79 @@ def execute(self) -> Generator[dict, None, None]: def __extract_ps_base_file(self) -> Generator[dict, None, None]: """ Extracts process data from ps.txt. - - :return: A generator yielding dictionaries containing process details from ps.txt. """ entity_type = 'ps.txt' try: for p in PsParser(self.config, self.case_id).get_result(): - ps_event = Event( - datetime=datetime.fromisoformat(p['datetime']), - message= self._strip_flags(p['data']['command']), - timestamp_desc=p['timestamp_desc'], - module=self.module_name, - data={'source': entity_type} - ) - if self.add_if_full_command_is_not_in_set(ps_event.message): - yield ps_event.to_dict() + uid = p['data'].get('uid') + process_name = self._strip_flags(p['data']['command']) + + if self.add_if_full_command_is_not_in_set(process_name, uid): + yield Event( + datetime=datetime.fromisoformat(p['datetime']), + message=process_name, + timestamp_desc=p['timestamp_desc'], + module=self.module_name, + data={'source': entity_type, 'uid': uid} + ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type} file. {e}") def __extract_ps_thread_file(self) -> Generator[dict, None, None]: """ Extracts process data from psthread.txt. - - :return: A generator yielding dictionaries containing process details from psthread.txt. """ entity_type = 'psthread.txt' try: for p in PsThreadParser(self.config, self.case_id).get_result(): - ps_event = Event( - datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['command']), - timestamp_desc=p['timestamp_desc'], - module=self.module_name, - data={'source': entity_type} - ) - if self.add_if_full_command_is_not_in_set(ps_event.message): - yield ps_event.to_dict() + uid = p['data'].get('uid') + process_name = self._strip_flags(p['data']['command']) + + if self.add_if_full_command_is_not_in_set(process_name, uid): + yield Event( + datetime=datetime.fromisoformat(p['datetime']), + message=process_name, + timestamp_desc=p['timestamp_desc'], + module=self.module_name, + data={'source': entity_type, 'uid': uid} + ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type} file. {e}") def __extract_ps_spindump_nosymbols_file(self) -> Generator[dict, None, None]: """ Extracts process data from spindump-nosymbols.txt. - - :return: A generator yielding dictionaries containing process and thread details from spindump-nosymbols.txt. """ entity_type = 'spindump-nosymbols.txt' try: - for event in SpindumpNoSymbolsParser(self.config, self.case_id).get_result(): - p = event['data'] + for event_data in SpindumpNoSymbolsParser(self.config, self.case_id).get_result(): + p = event_data['data'] if 'process' not in p: continue + process_name = p.get('path', '/kernel' if p['process'] == 'kernel_task [0]' else p['process']) - - if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name)): + uid = p.get('uid') + + if self.add_if_full_command_is_not_in_set(self._strip_flags(process_name), uid): yield Event( - datetime=datetime.fromisoformat(event['datetime']), + datetime=datetime.fromisoformat(event_data['datetime']), message=self._strip_flags(process_name), - timestamp_desc=event['timestamp_desc'], + timestamp_desc=event_data['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid} ).to_dict() - for t in p['threads']: + # Process threads + for t in p.get('threads', []): try: thread_name = f"{self._strip_flags(process_name)}::{t['thread_name']}" - if self.add_if_full_command_is_not_in_set(thread_name): + if self.add_if_full_command_is_not_in_set(thread_name, uid): yield Event( - datetime=datetime.fromisoformat(event['datetime']), - message=self._strip_flags(thread_name), - timestamp_desc=event['timestamp_desc'], + datetime=datetime.fromisoformat(event_data['datetime']), + message=thread_name, + timestamp_desc=event_data['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid} ).to_dict() except KeyError: pass @@ -251,19 +173,24 @@ def __extract_ps_spindump_nosymbols_file(self) -> Generator[dict, None, None]: def __extract_ps_shutdownlogs(self) -> Generator[dict, None, None]: """ Extracts process data from shutdown logs. - - :return: A generator yielding dictionaries containing process details from shutdown logs. """ entity_type = 'shutdown.logs' try: for p in ShutdownLogsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['command'])): + uid = p['data'].get('uid') + auid = p['data'].get('auid') + process_name = self._strip_flags(p['data']['command']) + + # Use uid for uniqueness check, fallback to auid + uniqueness_uid = uid if uid is not None else auid + + if self.add_if_full_command_is_not_in_set(process_name, uniqueness_uid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['command']), + message=process_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'auid': auid} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -271,66 +198,137 @@ def __extract_ps_shutdownlogs(self) -> Generator[dict, None, None]: def __extract_ps_logarchive(self) -> Generator[dict, None, None]: """ Extracts process data from logarchive. - - :return: A generator yielding dictionaries containing process details from logarchive. """ entity_type = 'log archive' try: for p in LogarchiveParser(self.config, self.case_id).get_result(): - # First check if we can extract a binary from the message - extracted_process = self.message_extract_binary(p['data']['process'], p['message']) - if extracted_process: - # Handle the case where extracted_process is a list of paths - if isinstance(extracted_process, list): - for proc_path in extracted_process: - if self.add_if_full_command_is_not_in_set(self._strip_flags(proc_path)): - yield Event( - datetime.fromisoformat(p['datetime']), - message=self._strip_flags(proc_path), - timestamp_desc=p['timestamp_desc'], - module=self.module_name, - data={'source': entity_type} - ).to_dict() - else: - # Handle the case where it's a single string - if self.add_if_full_command_is_not_in_set(self._strip_flags(extracted_process)): + # First check for special message patterns that contain process information + if 'message' in p: + extracted_processes = self._extract_processes_from_message( + p['data'].get('process', ''), + p['message'] + ) + for proc_info in extracted_processes: + process_path = proc_info['path'] + euid = proc_info.get('euid', p['data'].get('euid')) + + if self.add_if_full_command_is_not_in_set(self._strip_flags(process_path), euid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(extracted_process), + message=self._strip_flags(process_path), timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type + ' message', 'uid': euid} ).to_dict() # Process the original process name - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process'])): + euid = p['data'].get('euid') + process_name = self._strip_flags(p['data']['process']) + + if self.add_if_full_command_is_not_in_set(process_name, euid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['process']), + message=process_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': euid} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") + def _extract_processes_from_message(self, process: str, message: str) -> list[dict]: + """ + Extracts process information from special log messages. + Returns a list of dicts with 'path' and optionally 'euid' keys. + """ + processes = [] + + # Case 1: Backboardd Signpost messages + if process == '/usr/libexec/backboardd' and 'Signpost' in message and 'process_name=' in message: + try: + process_name_start = message.find('process_name=') + if process_name_start != -1: + process_name_start += len('process_name=') + process_name_end = message.find(' ', process_name_start) + path = message[process_name_start:process_name_end] if process_name_end != -1 else message[process_name_start:] + processes.append({'path': path}) + except Exception as e: + logger.debug(f"Error extracting process_name from backboardd: {e}") + + # Case 2: TCCD process messages with binary_path and euid + elif process == '/System/Library/PrivateFrameworks/TCC.framework/Support/tccd' and 'binary_path=' in message: + try: + start_pos = 0 + while True: + binary_path_start = message.find('binary_path=', start_pos) + if binary_path_start == -1: + break + + binary_path_start += len('binary_path=') + binary_path_end = None + for delimiter in [',', '}', ' access to', ' is checking']: + delimiter_pos = message.find(delimiter, binary_path_start) + if delimiter_pos != -1 and (binary_path_end is None or delimiter_pos < binary_path_end): + binary_path_end = delimiter_pos + + path = message[binary_path_start:binary_path_end].strip() if binary_path_end else message[binary_path_start:].strip() + + if path and path.startswith('/') and ' ' not in path: + proc_info = {'path': path} + # Try to extract euid for this specific binary + euid = self.extract_euid_from_tccd_message(message, path) + if euid is not None: + proc_info['euid'] = euid + processes.append(proc_info) + + start_pos = binary_path_start + 1 + except Exception as e: + logger.debug(f"Error extracting binary_path from tccd: {e}") + + # Case 3: /kernel process with App name mapping + elif process == '/kernel' and ' -> ' in message and 'App Store Fast Path' in message: + try: + arrow_pos = message.find(' -> ') + if arrow_pos != -1: + path_start = arrow_pos + len(' -> ') + if message[path_start:].startswith('/'): + path_end = message.find(' ', path_start) + path = message[path_start:path_end] if path_end != -1 else message[path_start:] + processes.append({'path': path}) + except Exception as e: + logger.debug(f"Error extracting app path from kernel mapping: {e}") + + # Case 4: configd SCDynamicStore client sessions + elif process == '/usr/libexec/configd' and 'SCDynamicStore/client sessions' in message: + try: + lines = message.split('\n') + for line in lines: + line = line.strip() + if line.startswith('"') and '=' in line: + client_path = line.split('"')[1] + if ':' in client_path: + process_path = client_path.split(':')[0] + if process_path.startswith('/') or process_path.startswith('com.apple.'): + processes.append({'path': process_path}) + except Exception as e: + logger.debug(f"Error extracting client paths from configd: {e}") + + return processes + def __extract_ps_uuid2path(self) -> Generator[dict, None, None]: """ Extracts process data from UUID2PathParser. - - :return: A generator yielding process data from uuid2path. """ entity_type = 'uuid2path' try: for p in UUID2PathParser(self.config, self.case_id).get_result().values(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p)): - # FIXME: what timestamp to use here? + if self.add_if_full_command_is_not_in_set(self._strip_flags(p), None): yield Event( datetime=self.sysdiagnose_creation_datetime, message=self._strip_flags(p), timestamp_desc="Process path from UUID existing at sysdiagnose creation time", module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -338,8 +336,6 @@ def __extract_ps_uuid2path(self) -> Generator[dict, None, None]: def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: """ Extracts process and thread information from TaskinfoParser. - - :return: A generator yielding process and thread information from taskinfo. """ entity_type = 'taskinfo.txt' try: @@ -347,25 +343,33 @@ def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: if 'name' not in p['data']: continue - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['name'])): + uid = p['data'].get('uid') + auid = p['data'].get('auid') + process_name = self._strip_flags(p['data']['name']) + + # Use uid for uniqueness check, fallback to auid + uniqueness_uid = uid if uid is not None else auid + + if self.add_if_full_path_is_not_in_set(process_name, uniqueness_uid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['name']), + message=process_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'auid': auid} ).to_dict() - for t in p['data']['threads']: + # Process threads + for t in p['data'].get('threads', []): try: - thread_name = f"{self._strip_flags(p['data']['name'])}::{t['thread name']}" - if self.add_if_full_path_is_not_in_set(thread_name): + thread_name = f"{process_name}::{t['thread name']}" + if self.add_if_full_path_is_not_in_set(thread_name, uniqueness_uid): yield Event( - datetime.fromisoformat(p['datetime']), + datetime=datetime.fromisoformat(p['datetime']), message=thread_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid, 'auid': auid} ).to_dict() except KeyError: pass @@ -375,22 +379,19 @@ def __extract_ps_taskinfo(self) -> Generator[dict, None, None]: def __extract_ps_remotectl_dumpstate(self) -> Generator[dict, None, None]: """ Extracts process data from RemotectlDumpstateParser. - - :return: A generator yielding process data from remotectl_dumpstate.txt. """ entity_type = 'remotectl_dumpstate.txt' try: remotectl_dumpstate_json = RemotectlDumpstateParser(self.config, self.case_id).get_result() if remotectl_dumpstate_json: for p in remotectl_dumpstate_json['Local device']['Services']: - if self.add_if_full_path_is_not_in_set(self._strip_flags(p)): - # FIXME: what timestamp to use here? + if self.add_if_full_path_is_not_in_set(self._strip_flags(p), None): yield Event( datetime=self.sysdiagnose_creation_datetime, message=self._strip_flags(p), timestamp_desc="Existing service at sysdiagnose creation time", module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': None} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -398,19 +399,20 @@ def __extract_ps_remotectl_dumpstate(self) -> Generator[dict, None, None]: def __extract_ps_logdata_statistics(self) -> Generator[dict, None, None]: """ Extracts process data from logdata_statistics.jsonl. - - :return: A generator yielding process data from logdata_statistics.jsonl. """ entity_type = 'logdata.statistics.jsonl' try: for p in LogDataStatisticsParser(self.config, self.case_id).get_result(): - if self.add_if_full_command_is_not_in_set(self._strip_flags(p['data']['process'])): + uid = p['data'].get('uid', p['data'].get('euid')) + process_name = self._strip_flags(p['data']['process']) + + if self.add_if_full_command_is_not_in_set(process_name, uid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['process']), + message=process_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid} ).to_dict() except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") @@ -418,51 +420,74 @@ def __extract_ps_logdata_statistics(self) -> Generator[dict, None, None]: def __extract_ps_logdata_statistics_txt(self) -> Generator[dict, None, None]: """ Extracts process data from logdata.statistics.txt. - - :return: A generator yielding process data from logdata.statistics.txt. """ entity_type = "logdata.statistics.txt" - try: for p in LogDataStatisticsTxtParser(self.config, self.case_id).get_result(): - if self.add_if_full_path_is_not_in_set(self._strip_flags(p['data']['process'])): + uid = p['data'].get('uid', p['data'].get('euid')) + process_name = self._strip_flags(p['data']['process']) + + if self.add_if_full_path_is_not_in_set(process_name, uid): yield Event( datetime=datetime.fromisoformat(p['datetime']), - message=self._strip_flags(p['data']['process']), + message=process_name, timestamp_desc=p['timestamp_desc'], module=self.module_name, - data={'source': entity_type} + data={'source': entity_type, 'uid': uid} ).to_dict() - except Exception as e: logger.exception(f"ERROR while extracting {entity_type}. {e}") - def add_if_full_path_is_not_in_set(self, name: str) -> bool: + def add_if_full_path_is_not_in_set(self, name: str, uid: Optional[int] = None) -> bool: """ - Ensures that a process path is unique before adding it to the shared set. + Ensures that a process path with uid is unique before adding it to the shared set. :param name: Process path name + :param uid: User ID (can be uid, euid, or auid) :return: True if the process was not in the set and was added, False otherwise. """ - for item in self.all_ps: - if item.endswith(name): - return False - if item.split('::')[0].endswith(name): - return False - if '::' not in item and item.split(' ')[0].endswith(name): - return False # This covers cases with space-separated commands - self.all_ps.add(name) + key = (name, uid) + + # Check if this exact combination already exists + if key in self.all_ps: + return False + + # For backward compatibility, check if process exists without considering uid + # when either the new or existing uid is None + for item_name, item_uid in self.all_ps: + if item_name.endswith(name): + if uid is None or item_uid is None: + return False + if item_name.split('::')[0].endswith(name): + if uid is None or item_uid is None: + return False + if '::' not in item_name and item_name.split(' ')[0].endswith(name): + if uid is None or item_uid is None: + return False + + self.all_ps.add(key) return True - def add_if_full_command_is_not_in_set(self, name: str) -> bool: + def add_if_full_command_is_not_in_set(self, name: str, uid: Optional[int] = None) -> bool: """ - Ensures that a process command is unique before adding it to the shared set. + Ensures that a process command with uid is unique before adding it to the shared set. :param name: Process command name + :param uid: User ID (can be uid, euid, or auid) :return: True if the process was not in the set and was added, False otherwise. """ - for item in self.all_ps: - if item.startswith(name): - return False - self.all_ps.add(name) - return True + key = (name, uid) + + # Check if this exact combination already exists + if key in self.all_ps: + return False + + # For backward compatibility, check if process exists without considering uid + # when either the new or existing uid is None + for item_name, item_uid in self.all_ps: + if item_name.startswith(name): + if uid is None or item_uid is None: + return False + + self.all_ps.add(key) + return True \ No newline at end of file diff --git a/src/sysdiagnose/parsers/spindumpnosymbols.py b/src/sysdiagnose/parsers/spindumpnosymbols.py index a86c3f77..fbea1ce0 100644 --- a/src/sysdiagnose/parsers/spindumpnosymbols.py +++ b/src/sysdiagnose/parsers/spindumpnosymbols.py @@ -181,7 +181,14 @@ def parse_process(data): process['parent'] = process['parent'].split("[", 1)[0].strip() except KeyError: # some don't have a parent pass - process['uid'] = 501 + # Extract UID from parsed data if available, convert to int + if 'uid' in process and process['uid']: + try: + process['uid'] = int(process['uid']) + except (ValueError, TypeError): + process['uid'] = None + else: + process['uid'] = None return process From 8abdf4113e810e0b8c98179bfa88725c5532210a Mon Sep 17 00:00:00 2001 From: Itai Date: Fri, 15 Aug 2025 12:04:46 +0300 Subject: [PATCH 6/6] chore: update .gitignore to exclude local development files - Added .claude/ directory - Added crashes_mine/ directory - Added debug_spindump.py - Added test_euid_extraction.py - Added ps_everywhere_verbose.txt These are local development and testing files that should not be tracked in the repository. --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index d1a9af7a..a10c1711 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,6 @@ venv/ .Trashes ehthumbs.db Thumbs.db + +# Local development and testing files +.claude/ \ No newline at end of file