From d2ae867ed160892225103c9dc73aa13051336d66 Mon Sep 17 00:00:00 2001 From: kirtimanmishrazipstack Date: Fri, 5 Sep 2025 19:05:17 +0530 Subject: [PATCH] sidecar implementation sigterm --- .../unstract/tool_sidecar/log_processor.py | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tool-sidecar/src/unstract/tool_sidecar/log_processor.py b/tool-sidecar/src/unstract/tool_sidecar/log_processor.py index 86ade0cd72..a2ade732d0 100644 --- a/tool-sidecar/src/unstract/tool_sidecar/log_processor.py +++ b/tool-sidecar/src/unstract/tool_sidecar/log_processor.py @@ -7,6 +7,7 @@ import json import logging import os +import signal import time from datetime import UTC, datetime from typing import Any @@ -25,6 +26,17 @@ logger = logging.getLogger(__name__) +# Global shutdown flag for graceful termination +_shutdown_requested = False + + +def signal_handler(signum, frame): + """Handle shutdown signals gracefully.""" + global _shutdown_requested + signal_name = signal.Signals(signum).name + logger.info(f"Received {signal_name}, initiating graceful shutdown...") + _shutdown_requested = True + class LogProcessor: def __init__( @@ -210,7 +222,9 @@ def is_valid_log_type(self, log_type: str | None) -> bool: def monitor_logs(self) -> None: """Main loop to monitor log file for new content and completion signals. Uses file polling with position tracking to efficiently read new lines. + Handles graceful shutdown via SIGTERM. """ + global _shutdown_requested logger.info("Starting log monitoring...") if not self.wait_for_log_file(): raise TimeoutError("Log file was not created within timeout period") @@ -218,6 +232,12 @@ def monitor_logs(self) -> None: # Monitor the file for new content with open(self.log_path) as f: while True: + # Check for shutdown signal + if _shutdown_requested: + logger.info("Shutdown requested, performing final log collection...") + self._final_log_collection(f) + break + # Remember current position where = f.tell() line = f.readline() @@ -241,11 +261,43 @@ def monitor_logs(self) -> None: logger.info("Completion signal received") break + logger.info("Log monitoring completed") + + def _final_log_collection(self, file_handle) -> None: + """Perform final collection of any remaining logs before shutdown.""" + logger.info("Performing final log collection...") + + # Give main container brief moment to write final logs + time.sleep(0.2) + + lines_collected = 0 + + # Read any remaining lines in the file + while True: + line = file_handle.readline() + if not line: + break + + lines_collected += 1 + log_line = self.process_log_line(line.strip()) + + if log_line.is_terminated: + logger.info("Found completion signal during final collection") + + logger.info( + f"Final log collection completed, processed {lines_collected} remaining lines" + ) + def main(): """Main entry point for the sidecar container. Sets up the log processor with environment variables and starts monitoring. """ + # Set up signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + logger.info("Signal handlers registered for graceful shutdown") + # Get configuration from environment log_path = os.getenv(Env.LOG_PATH, "/shared/logs/logs.txt") redis_host = os.getenv(Env.REDIS_HOST)