Skip to content
2 changes: 2 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,5 @@ RUNNER_POLLING_INTERVAL_SECONDS=2
# Default: 1800 seconds (30 minutes)
# Examples: 900 (15 min), 1800 (30 min), 3600 (60 min)
MIN_SCHEDULE_INTERVAL_SECONDS=1800

HITL_FILES_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'
38 changes: 28 additions & 10 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ def handle_output(
"""Handle the output based on the connection type."""
connection_type = self.endpoint.connection_type
tool_execution_result: str | None = None

if connection_type == WorkflowEndpoint.ConnectionType.FILESYSTEM:
self.copy_output_to_output_directory()

Expand Down Expand Up @@ -871,9 +870,21 @@ def _push_to_queue(
).to_dict()

queue_result_json = json.dumps(queue_result)
conn = QueueUtils.get_queue_inst()
conn.enqueue(queue_name=q_name, message=queue_result_json)
logger.info(f"Pushed {file_name} to queue {q_name} with file content")
# Get organization_id for proper HITL queue filtering
organization_id = UserContext.get_organization_identifier()
conn = QueueUtils.get_queue_inst(
{"use_hitl_backend": True, "organization_id": organization_id}
)
# API deployments don't have TTL, use system actor
conn.enqueue_with_ttl(
queue_name=q_name,
message=queue_result_json,
ttl_seconds=None, # No TTL for API deployments
actor_id=None, # System-initiated enqueue
)
logger.info(
f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use cached self.organization_id instead of fetching again.

The organization_id is already fetched and stored in self.organization_id at line 79 during initialization. Fetching it again here is redundant and could theoretically lead to inconsistency if the user context changes between initialization and this call.

Apply this diff:

-            # Get organization_id for proper HITL queue filtering
-            organization_id = UserContext.get_organization_identifier()
             conn = QueueUtils.get_queue_inst(
-                {"use_hitl_backend": True, "organization_id": organization_id}
+                {"use_hitl_backend": True, "organization_id": self.organization_id}
             )
             # API deployments don't have TTL, use system actor
             conn.enqueue_with_ttl(
                 queue_name=q_name,
                 message=queue_result_json,
                 ttl_seconds=None,  # No TTL for API deployments
                 actor_id=None,  # System-initiated enqueue
             )
             logger.info(
-                f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}"
+                f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id}"
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Get organization_id for proper HITL queue filtering
organization_id = UserContext.get_organization_identifier()
conn = QueueUtils.get_queue_inst(
{"use_hitl_backend": True, "organization_id": organization_id}
)
# API deployments don't have TTL, use system actor
conn.enqueue_with_ttl(
queue_name=q_name,
message=queue_result_json,
ttl_seconds=None, # No TTL for API deployments
actor_id=None, # System-initiated enqueue
)
logger.info(
f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}"
)
conn = QueueUtils.get_queue_inst(
{"use_hitl_backend": True, "organization_id": self.organization_id}
)
# API deployments don't have TTL, use system actor
conn.enqueue_with_ttl(
queue_name=q_name,
message=queue_result_json,
ttl_seconds=None, # No TTL for API deployments
actor_id=None, # System-initiated enqueue
)
logger.info(
f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id}"
)
🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/destination.py around lines 873 to 887,
the code redundantly calls UserContext.get_organization_identifier() even though
self.organization_id was set at initialization; replace that call with using
self.organization_id (ensure it is used directly when building the
QueueUtils.get_queue_inst call and in the log message) so the method uses the
cached organization identifier rather than re-fetching from UserContext.

return
connector_settings: dict[str, Any] = connector.connector_metadata

Expand Down Expand Up @@ -923,15 +934,22 @@ def _push_to_queue(
)
raise ValueError("Cannot enqueue empty JSON message")

conn = QueueUtils.get_queue_inst()

# Use the TTL metadata that was already set in the QueueResult object
# Get organization_id for proper HITL queue filtering
organization_id = UserContext.get_organization_identifier()
conn = QueueUtils.get_queue_inst(
{"use_hitl_backend": True, "organization_id": organization_id}
)
# Use TTL from workflow settings, system actor for workflow enqueue
ttl_seconds = queue_result_obj.ttl_seconds

conn.enqueue_with_ttl(
queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds
queue_name=q_name,
message=queue_result_json,
ttl_seconds=ttl_seconds,
actor_id=None, # System-initiated enqueue
)
logger.info(
f"Pushed {file_name} to queue {q_name} with organization_id {organization_id} and TTL {ttl_seconds}"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use cached self.organization_id instead of fetching again.

Same issue as in the API path above. The organization_id is already available as self.organization_id from initialization (line 79) and should be reused here.

Apply this diff:

-            # Get organization_id for proper HITL queue filtering
-            organization_id = UserContext.get_organization_identifier()
             conn = QueueUtils.get_queue_inst(
-                {"use_hitl_backend": True, "organization_id": organization_id}
+                {"use_hitl_backend": True, "organization_id": self.organization_id}
             )
             # Use TTL from workflow settings, system actor for workflow enqueue
             ttl_seconds = queue_result_obj.ttl_seconds
             conn.enqueue_with_ttl(
                 queue_name=q_name,
                 message=queue_result_json,
                 ttl_seconds=ttl_seconds,
                 actor_id=None,  # System-initiated enqueue
             )
             logger.info(
-                f"Pushed {file_name} to queue {q_name} with organization_id {organization_id} and TTL {ttl_seconds}"
+                f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id} and TTL {ttl_seconds}"
             )
🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/destination.py around lines 937 to 952,
the code re-fetches the organization id via
UserContext.get_organization_identifier() even though self.organization_id was
set at initialization; replace the local call and assign organization_id =
self.organization_id (or use self.organization_id directly) when building the
QueueUtils.get_queue_inst call and in the log message so the cached value is
reused and the extra context lookup is removed.

logger.info(f"Pushed {file_name} to queue {q_name} with file content")

def _read_file_content_for_queue(self, input_file_path: str, file_name: str) -> str:
"""Read and encode file content for queue message.
Expand Down
120 changes: 120 additions & 0 deletions backend/workflow_manager/endpoint_v2/queue_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from enum import Enum
from typing import Any

from django.conf import settings
from utils.constants import Common
from workflow_manager.endpoint_v2.exceptions import UnstractQueueException

Expand All @@ -20,8 +21,25 @@ class QueueResultStatus(Enum):


class QueueUtils:
# HITL connectors registry (lazy-loaded to avoid circular imports)
_hitl_connectors = {}

@staticmethod
def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue:
"""Get queue connector instance based on configuration.

For HITL operations, this can return PostgreSQL, Hybrid, or Redis connectors
based on the HITL_QUEUE_BACKEND setting.
"""
# Check if this is for HITL operations
is_hitl = connector_settings.get("use_hitl_backend", False)

if is_hitl:
# Use HITL-specific queue backend
hitl_backend = getattr(settings, "HITL_QUEUE_BACKEND", "hybrid")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Specify the default value for this setting in the config file itself and directly use the setting here

return QueueUtils.get_hitl_queue_inst(hitl_backend, connector_settings)

# Default behavior for non-HITL operations
if not queue_connectors:
raise UnstractQueueException(detail="Queue connector not exists")
queue_connector_key = next(iter(queue_connectors))
Expand All @@ -31,6 +49,108 @@ def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue:
connector_class: UnstractQueue = connector(connector_settings)
return connector_class

@staticmethod
def get_hitl_queue_inst(
backend: str, connector_settings: dict[str, Any] = {}
) -> UnstractQueue:
"""Get HITL-specific queue connector instance with dynamic imports.

This method uses dynamic imports to avoid hard dependencies on the
manual_review_v2 pluggable app, allowing graceful degradation when
the app is not available.

Args:
backend: Backend type ('postgresql', 'hybrid', 'redis')
connector_settings: Connector configuration

Returns:
Configured queue connector instance

Raises:
UnstractQueueException: When HITL connectors are not available
"""
# For Redis backend, use default connector
if backend == "redis":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: @vishnuszipstack can we keep this as a constant to keep uniformity in codebase.

# Strip HITL flag to force default (non-HITL) connector path
non_hitl_settings = dict(connector_settings)
non_hitl_settings.pop("use_hitl_backend", None)
return QueueUtils.get_queue_inst(non_hitl_settings)

# For PostgreSQL and Hybrid backends, try dynamic imports
try:
if backend == "postgresql":
connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue")
return connector_class(connector_settings)

elif backend == "hybrid":
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)

else:
logger.warning(
f"Unknown HITL queue backend '{backend}'. "
f"Valid options: postgresql, hybrid, redis. "
f"Attempting fallback to hybrid."
)
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)

Comment on lines +73 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize HITL backend selection to avoid wrong connector path

HITL_QUEUE_BACKEND comes from environment configuration, so values like REDIS, Redis, or PostgreSQL are plausible. Because the current check is case-sensitive, anything other than exact lowercase redis/postgresql/hybrid slides into the “unknown” branch, skipping the Redis fallback and, in many deployments, raising an exception instead of enqueueing. Please normalize the backend string (e.g., normalized_backend = (backend or "").lower()) before branching so all common spellings hit the intended code path.

-        if backend == "redis":
+        normalized_backend = (backend or "").lower()
+        if normalized_backend == "redis":
             # Strip HITL flag to force default (non-HITL) connector path
             non_hitl_settings = dict(connector_settings)
             non_hitl_settings.pop("use_hitl_backend", None)
             return QueueUtils.get_queue_inst(non_hitl_settings)
 
         # For PostgreSQL and Hybrid backends, try dynamic imports
         try:
-            if backend == "postgresql":
+            if normalized_backend == "postgresql":
                 connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue")
                 return connector_class(connector_settings)
 
-            elif backend == "hybrid":
+            elif normalized_backend == "hybrid":
                 connector_class = QueueUtils._import_hitl_connector("HybridQueue")
                 return connector_class(connector_settings)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if backend == "redis":
# Strip HITL flag to force default (non-HITL) connector path
non_hitl_settings = dict(connector_settings)
non_hitl_settings.pop("use_hitl_backend", None)
return QueueUtils.get_queue_inst(non_hitl_settings)
# For PostgreSQL and Hybrid backends, try dynamic imports
try:
if backend == "postgresql":
connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue")
return connector_class(connector_settings)
elif backend == "hybrid":
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)
else:
logger.warning(
f"Unknown HITL queue backend '{backend}'. "
f"Valid options: postgresql, hybrid, redis. "
f"Attempting fallback to hybrid."
)
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)
normalized_backend = (backend or "").lower()
if normalized_backend == "redis":
# Strip HITL flag to force default (non-HITL) connector path
non_hitl_settings = dict(connector_settings)
non_hitl_settings.pop("use_hitl_backend", None)
return QueueUtils.get_queue_inst(non_hitl_settings)
# For PostgreSQL and Hybrid backends, try dynamic imports
try:
if normalized_backend == "postgresql":
connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue")
return connector_class(connector_settings)
elif normalized_backend == "hybrid":
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)
else:
logger.warning(
f"Unknown HITL queue backend '{backend}'. "
f"Valid options: postgresql, hybrid, redis. "
f"Attempting fallback to hybrid."
)
connector_class = QueueUtils._import_hitl_connector("HybridQueue")
return connector_class(connector_settings)
🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/queue_utils.py around lines 73 to 97,
the branch logic uses case-sensitive backend string checks causing values like
"Redis" or "PostgreSQL" to fall through to the unknown branch; fix by
normalizing the incoming backend value first (e.g., normalized_backend =
(backend or "").lower()) and then use normalized_backend in all subsequent
comparisons and logging, ensuring the Redis/PostgreSQL/Hybrid branches and the
fallback behave correctly even when env vars differ in case.

except ImportError as e:
logger.error(
f"HITL queue backend '{backend}' not available: {e}. "
f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured."
)
raise UnstractQueueException(
detail=f"HITL queue backend '{backend}' not available. "
f"Please install the manual_review_v2 app or use 'redis' backend."
)
except Exception as e:
logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}")
raise UnstractQueueException(
detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}"
)
Comment on lines +98 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Might not be necessary to log the error here since the middleware takes care of it already in case of an error. Also in the given pattern, we print the pythonic error and we might risk showing it to the user


@classmethod
def _import_hitl_connector(cls, connector_name: str):
"""Get HITL connector class from registry (lazy-loaded to avoid circular imports).

Args:
connector_name: Name of the connector class to get

Returns:
The connector class

Raises:
ImportError: When the connector is not available
"""
# Lazy initialization on first access (avoids circular imports)
if not cls._hitl_connectors:
try:
from pluggable_apps.manual_review_v2.connectors import (
HybridQueue,
PostgreSQLQueue,
)

cls._hitl_connectors = {
"PostgreSQLQueue": PostgreSQLQueue,
"HybridQueue": HybridQueue,
}
except ImportError as e:
logger.debug(f"HITL connectors not available: {e}")

if not cls._hitl_connectors:
raise ImportError(
"HITL connectors not available. Make sure 'pluggable_apps.manual_review_v2' is installed."
)

if connector_name not in cls._hitl_connectors:
available_connectors = list(cls._hitl_connectors.keys())
raise ImportError(
f"Unknown HITL connector: {connector_name}. Available: {available_connectors}"
)

return cls._hitl_connectors[connector_name]

@staticmethod
def calculate_remaining_ttl(enqueued_at: float, ttl_seconds: int) -> int | None:
"""Calculate remaining TTL based on original enqueue time and TTL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ def get_provider(var_name: str, default: str = "minio") -> FileStorageProvider:
STORAGE_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: StorageType.SHARED_TEMPORARY,
FileStorageType.API_EXECUTION: StorageType.SHARED_TEMPORARY,
FileStorageType.HITL_FILES: StorageType.SHARED_TEMPORARY, # Use shared temporary for HITL files
}

FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: "WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS",
FileStorageType.API_EXECUTION: "API_FILE_STORAGE_CREDENTIALS",
FileStorageType.HITL_FILES: "HITL_FILES_FILE_STORAGE_CREDENTIALS",
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
class FileStorageType(Enum):
WORKFLOW_EXECUTION = "WORKFLOW_EXECUTION"
API_EXECUTION = "API_EXECUTION"
HITL_FILES = "HITL_FILES"