From ce05e1c8c2d189856106bdfefe3b5140da5ec7b6 Mon Sep 17 00:00:00 2001 From: vishnuszipstack Date: Tue, 26 Aug 2025 10:38:05 +0530 Subject: [PATCH 1/3] in progress --- .../endpoint_v2/queue_utils.py | 106 ++++++++++++++++++ .../filesystem/file_storage_config.py | 2 + .../unstract/filesystem/file_storage_types.py | 1 + 3 files changed, 109 insertions(+) diff --git a/backend/workflow_manager/endpoint_v2/queue_utils.py b/backend/workflow_manager/endpoint_v2/queue_utils.py index cda73fbe16..6e5e183cef 100644 --- a/backend/workflow_manager/endpoint_v2/queue_utils.py +++ b/backend/workflow_manager/endpoint_v2/queue_utils.py @@ -4,6 +4,7 @@ from enum import Enum from typing import Any +from django.conf import settings from utils.constants import Common from workflow_manager.endpoint_v2.exceptions import UnstractQueueException @@ -22,6 +23,20 @@ class QueueResultStatus(Enum): class QueueUtils: @staticmethod def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: + """Get queue connector instance based on configuration. + + For HITL operations, this can return PostgreSQL, Hybrid, or Redis connectors + based on the HITL_QUEUE_BACKEND setting. + """ + # Check if this is for HITL operations + is_hitl = connector_settings.get("use_hitl_backend", False) + + if is_hitl: + # Use HITL-specific queue backend + hitl_backend = getattr(settings, "HITL_QUEUE_BACKEND", "hybrid") + return QueueUtils.get_hitl_queue_inst(hitl_backend, connector_settings) + + # Default behavior for non-HITL operations if not queue_connectors: raise UnstractQueueException(detail="Queue connector not exists") queue_connector_key = next(iter(queue_connectors)) @@ -31,6 +46,97 @@ def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: connector_class: UnstractQueue = connector(connector_settings) return connector_class + @staticmethod + def get_hitl_queue_inst( + backend: str, connector_settings: dict[str, Any] = {} + ) -> UnstractQueue: + """Get HITL-specific queue connector instance with dynamic imports. + + This method uses dynamic imports to avoid hard dependencies on the + manual_review_v2 pluggable app, allowing graceful degradation when + the app is not available. + + Args: + backend: Backend type ('postgresql', 'hybrid', 'redis') + connector_settings: Connector configuration + + Returns: + Configured queue connector instance + + Raises: + UnstractQueueException: When HITL connectors are not available + """ + # For Redis backend, use default connector + if backend == "redis": + return QueueUtils.get_queue_inst(connector_settings) + + # For PostgreSQL and Hybrid backends, try dynamic imports + try: + if backend == "postgresql": + connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue") + return connector_class(connector_settings) + + elif backend == "hybrid": + connector_class = QueueUtils._import_hitl_connector("HybridQueue") + return connector_class(connector_settings) + + else: + logger.warning( + f"Unknown HITL queue backend '{backend}'. " + f"Valid options: postgresql, hybrid, redis. " + f"Attempting fallback to hybrid." + ) + connector_class = QueueUtils._import_hitl_connector("HybridQueue") + return connector_class(connector_settings) + + except ImportError as e: + logger.error( + f"HITL queue backend '{backend}' not available: {e}. " + f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured." + ) + raise UnstractQueueException( + detail=f"HITL queue backend '{backend}' not available. " + f"Please install the manual_review_v2 app or use 'redis' backend." + ) + except Exception as e: + logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}") + raise UnstractQueueException( + detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" + ) + + @staticmethod + def _import_hitl_connector(connector_name: str): + """Dynamically import HITL connector class. + + Args: + connector_name: Name of the connector class to import + + Returns: + The imported connector class + + Raises: + ImportError: When the connector cannot be imported + """ + try: + from pluggable_apps.manual_review_v2.connectors import ( + HybridQueue, + PostgreSQLQueue, + ) + + connectors = { + "PostgreSQLQueue": PostgreSQLQueue, + "HybridQueue": HybridQueue, + } + + if connector_name not in connectors: + raise ImportError(f"Unknown HITL connector: {connector_name}") + + return connectors[connector_name] + + except ImportError as e: + logger.debug(f"Failed to import HITL connector '{connector_name}': {e}") + raise + @staticmethod def calculate_remaining_ttl(enqueued_at: float, ttl_seconds: int) -> int | None: """Calculate remaining TTL based on original enqueue time and TTL. diff --git a/unstract/filesystem/src/unstract/filesystem/file_storage_config.py b/unstract/filesystem/src/unstract/filesystem/file_storage_config.py index 85d45ad5ad..27440d2c1f 100644 --- a/unstract/filesystem/src/unstract/filesystem/file_storage_config.py +++ b/unstract/filesystem/src/unstract/filesystem/file_storage_config.py @@ -28,9 +28,11 @@ def get_provider(var_name: str, default: str = "minio") -> FileStorageProvider: STORAGE_MAPPING = { FileStorageType.WORKFLOW_EXECUTION: StorageType.SHARED_TEMPORARY, FileStorageType.API_EXECUTION: StorageType.SHARED_TEMPORARY, + FileStorageType.HITL_FILES: StorageType.SHARED_TEMPORARY, # Use shared temporary for HITL files } FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING = { FileStorageType.WORKFLOW_EXECUTION: "WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS", FileStorageType.API_EXECUTION: "API_FILE_STORAGE_CREDENTIALS", + FileStorageType.HITL_FILES: "HITL_FILE_STORAGE_CREDENTIALS", } diff --git a/unstract/filesystem/src/unstract/filesystem/file_storage_types.py b/unstract/filesystem/src/unstract/filesystem/file_storage_types.py index be9889c89d..2b6956fe69 100644 --- a/unstract/filesystem/src/unstract/filesystem/file_storage_types.py +++ b/unstract/filesystem/src/unstract/filesystem/file_storage_types.py @@ -4,3 +4,4 @@ class FileStorageType(Enum): WORKFLOW_EXECUTION = "WORKFLOW_EXECUTION" API_EXECUTION = "API_EXECUTION" + HITL_FILES = "HITL_FILES" From c41d33415edb04d71056ad7287724fbd00af5ffc Mon Sep 17 00:00:00 2001 From: vishnuszipstack Date: Tue, 23 Sep 2025 14:33:37 +0530 Subject: [PATCH 2/3] hitl enhancement updates --- .../endpoint_v2/destination.py | 39 ++++++++++---- .../endpoint_v2/queue_utils.py | 51 ++++++++++++------- .../filesystem/file_storage_config.py | 2 +- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index d50d49f904..462b6361a2 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -219,7 +219,6 @@ def handle_output( """Handle the output based on the connection type.""" connection_type = self.endpoint.connection_type tool_execution_result: str | None = None - if connection_type == WorkflowEndpoint.ConnectionType.FILESYSTEM: self.copy_output_to_output_directory() elif connection_type == WorkflowEndpoint.ConnectionType.DATABASE: @@ -800,9 +799,21 @@ def _push_to_queue( queue_result_json = json.dumps(queue_result) - conn = QueueUtils.get_queue_inst() - conn.enqueue(queue_name=q_name, message=queue_result_json) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") + # Get organization_id for proper HITL queue filtering + organization_id = UserContext.get_organization_identifier() + conn = QueueUtils.get_queue_inst( + {"use_hitl_backend": True, "organization_id": organization_id} + ) + # API deployments don't have TTL, use system actor + conn.enqueue_with_ttl( + queue_name=q_name, + message=queue_result_json, + ttl_seconds=None, # No TTL for API deployments + actor_id=None, # System-initiated enqueue + ) + logger.info( + f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}" + ) return connector_settings: dict[str, Any] = connector.connector_metadata @@ -836,7 +847,6 @@ def _push_to_queue( file_execution_id=file_execution_id, extracted_text=extracted_text, ) - # Add TTL metadata based on HITLSettings queue_result_obj.ttl_seconds = WorkflowUtil.get_hitl_ttl_seconds(workflow) @@ -850,15 +860,22 @@ def _push_to_queue( ) raise ValueError("Cannot enqueue empty JSON message") - conn = QueueUtils.get_queue_inst() - - # Use the TTL metadata that was already set in the QueueResult object + # Get organization_id for proper HITL queue filtering + organization_id = UserContext.get_organization_identifier() + conn = QueueUtils.get_queue_inst( + {"use_hitl_backend": True, "organization_id": organization_id} + ) + # Use TTL from workflow settings, system actor for workflow enqueue ttl_seconds = queue_result_obj.ttl_seconds - conn.enqueue_with_ttl( - queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds + queue_name=q_name, + message=queue_result_json, + ttl_seconds=ttl_seconds, + actor_id=None, # System-initiated enqueue + ) + logger.info( + f"Pushed {file_name} to queue {q_name} with organization_id {organization_id} and TTL {ttl_seconds}" ) - logger.info(f"Pushed {file_name} to queue {q_name} with file content") def _read_file_content_for_queue(self, input_file_path: str, file_name: str) -> str: """Read and encode file content for queue message. diff --git a/backend/workflow_manager/endpoint_v2/queue_utils.py b/backend/workflow_manager/endpoint_v2/queue_utils.py index 6e5e183cef..36e88ccfc5 100644 --- a/backend/workflow_manager/endpoint_v2/queue_utils.py +++ b/backend/workflow_manager/endpoint_v2/queue_utils.py @@ -13,6 +13,9 @@ logger = logging.getLogger(__name__) +# HITL connectors registry (initialized at module level, similar to Redis pattern) +hitl_connectors = {} + class QueueResultStatus(Enum): SUCCESS = "success" @@ -106,36 +109,46 @@ def get_hitl_queue_inst( @staticmethod def _import_hitl_connector(connector_name: str): - """Dynamically import HITL connector class. + """Get HITL connector class from registry (follows Redis pattern with lazy loading). Args: - connector_name: Name of the connector class to import + connector_name: Name of the connector class to get Returns: - The imported connector class + The connector class Raises: - ImportError: When the connector cannot be imported + ImportError: When the connector is not available """ - try: - from pluggable_apps.manual_review_v2.connectors import ( - HybridQueue, - PostgreSQLQueue, - ) + global hitl_connectors + + # Lazy initialization on first access (avoids circular imports) + if not hitl_connectors: + try: + from pluggable_apps.manual_review_v2.connectors import ( + HybridQueue, + PostgreSQLQueue, + ) - connectors = { - "PostgreSQLQueue": PostgreSQLQueue, - "HybridQueue": HybridQueue, - } + hitl_connectors = { + "PostgreSQLQueue": PostgreSQLQueue, + "HybridQueue": HybridQueue, + } + except ImportError as e: + logger.debug(f"HITL connectors not available: {e}") - if connector_name not in connectors: - raise ImportError(f"Unknown HITL connector: {connector_name}") + if not hitl_connectors: + raise ImportError( + "HITL connectors not available. Make sure 'pluggable_apps.manual_review_v2' is installed." + ) - return connectors[connector_name] + if connector_name not in hitl_connectors: + available_connectors = list(hitl_connectors.keys()) + raise ImportError( + f"Unknown HITL connector: {connector_name}. Available: {available_connectors}" + ) - except ImportError as e: - logger.debug(f"Failed to import HITL connector '{connector_name}': {e}") - raise + return hitl_connectors[connector_name] @staticmethod def calculate_remaining_ttl(enqueued_at: float, ttl_seconds: int) -> int | None: diff --git a/unstract/filesystem/src/unstract/filesystem/file_storage_config.py b/unstract/filesystem/src/unstract/filesystem/file_storage_config.py index 27440d2c1f..35bd306195 100644 --- a/unstract/filesystem/src/unstract/filesystem/file_storage_config.py +++ b/unstract/filesystem/src/unstract/filesystem/file_storage_config.py @@ -34,5 +34,5 @@ def get_provider(var_name: str, default: str = "minio") -> FileStorageProvider: FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING = { FileStorageType.WORKFLOW_EXECUTION: "WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS", FileStorageType.API_EXECUTION: "API_FILE_STORAGE_CREDENTIALS", - FileStorageType.HITL_FILES: "HITL_FILE_STORAGE_CREDENTIALS", + FileStorageType.HITL_FILES: "HITL_FILES_FILE_STORAGE_CREDENTIALS", } From 047e60119613e330a936298b42badb45cb88aaba Mon Sep 17 00:00:00 2001 From: vishnuszipstack Date: Fri, 26 Sep 2025 11:45:49 +0530 Subject: [PATCH 3/3] pr comment fixes] --- backend/sample.env | 2 ++ .../endpoint_v2/queue_utils.py | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/backend/sample.env b/backend/sample.env index ff59a75a0f..dce1fa8743 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -200,3 +200,5 @@ RUNNER_POLLING_INTERVAL_SECONDS=2 # Default: 1800 seconds (30 minutes) # Examples: 900 (15 min), 1800 (30 min), 3600 (60 min) MIN_SCHEDULE_INTERVAL_SECONDS=1800 + +HITL_FILES_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}' diff --git a/backend/workflow_manager/endpoint_v2/queue_utils.py b/backend/workflow_manager/endpoint_v2/queue_utils.py index 36e88ccfc5..a0a06b85b7 100644 --- a/backend/workflow_manager/endpoint_v2/queue_utils.py +++ b/backend/workflow_manager/endpoint_v2/queue_utils.py @@ -13,9 +13,6 @@ logger = logging.getLogger(__name__) -# HITL connectors registry (initialized at module level, similar to Redis pattern) -hitl_connectors = {} - class QueueResultStatus(Enum): SUCCESS = "success" @@ -24,6 +21,9 @@ class QueueResultStatus(Enum): class QueueUtils: + # HITL connectors registry (lazy-loaded to avoid circular imports) + _hitl_connectors = {} + @staticmethod def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: """Get queue connector instance based on configuration. @@ -71,7 +71,10 @@ def get_hitl_queue_inst( """ # For Redis backend, use default connector if backend == "redis": - return QueueUtils.get_queue_inst(connector_settings) + # Strip HITL flag to force default (non-HITL) connector path + non_hitl_settings = dict(connector_settings) + non_hitl_settings.pop("use_hitl_backend", None) + return QueueUtils.get_queue_inst(non_hitl_settings) # For PostgreSQL and Hybrid backends, try dynamic imports try: @@ -107,9 +110,9 @@ def get_hitl_queue_inst( detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" ) - @staticmethod - def _import_hitl_connector(connector_name: str): - """Get HITL connector class from registry (follows Redis pattern with lazy loading). + @classmethod + def _import_hitl_connector(cls, connector_name: str): + """Get HITL connector class from registry (lazy-loaded to avoid circular imports). Args: connector_name: Name of the connector class to get @@ -120,35 +123,33 @@ def _import_hitl_connector(connector_name: str): Raises: ImportError: When the connector is not available """ - global hitl_connectors - # Lazy initialization on first access (avoids circular imports) - if not hitl_connectors: + if not cls._hitl_connectors: try: from pluggable_apps.manual_review_v2.connectors import ( HybridQueue, PostgreSQLQueue, ) - hitl_connectors = { + cls._hitl_connectors = { "PostgreSQLQueue": PostgreSQLQueue, "HybridQueue": HybridQueue, } except ImportError as e: logger.debug(f"HITL connectors not available: {e}") - if not hitl_connectors: + if not cls._hitl_connectors: raise ImportError( "HITL connectors not available. Make sure 'pluggable_apps.manual_review_v2' is installed." ) - if connector_name not in hitl_connectors: - available_connectors = list(hitl_connectors.keys()) + if connector_name not in cls._hitl_connectors: + available_connectors = list(cls._hitl_connectors.keys()) raise ImportError( f"Unknown HITL connector: {connector_name}. Available: {available_connectors}" ) - return hitl_connectors[connector_name] + return cls._hitl_connectors[connector_name] @staticmethod def calculate_remaining_ttl(enqueued_at: float, ttl_seconds: int) -> int | None: