- 
                Notifications
    
You must be signed in to change notification settings  - Fork 560
 
UN-2663 [FEAT] moving away from redis hitl file handling improvements #1550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
UN-2663 [FEAT] moving away from redis hitl file handling improvements #1550
Conversation
…dling-improvements
…dling-improvements
          
Summary by CodeRabbit
 WalkthroughAdds HITL-aware queue selection and enqueueing: destination resolves organization_id to request HITL-capable queues; QueueUtils gains HITL routing, lazy connector imports/caching, and explicit HITL queue instantiation; filesystem adds HITL_FILES storage type and credential mapping; sample env exposes HITL storage credentials. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  participant Client
  participant Destination as endpoint_v2.destination
  participant QueueUtils
  participant HITLConnector as HITL Connector
  participant Queue as Queue Instance
  Client->>Destination: enqueue request
  Destination->>Destination: resolve organization_id
  Destination->>QueueUtils: get_queue_inst({use_hitl_backend: true, organization_id})
  alt HITL path
    QueueUtils->>QueueUtils: get_hitl_queue_inst(backend, settings)
    alt backend == PostgreSQL or Hybrid
      QueueUtils->>QueueUtils: _import_hitl_connector(name)
      QueueUtils->>HITLConnector: instantiate(settings)
      HITLConnector-->>QueueUtils: return Queue Instance
    else backend == Redis
      QueueUtils->>QueueUtils: clear HITL flag, delegate to standard connector
    else unknown/missing
      QueueUtils-->>Destination: raise UnstractQueueException
    end
  else non-HITL path
    QueueUtils->>QueueUtils: standard get_queue_inst flow
  end
  QueueUtils-->>Destination: Queue Instance
  Destination->>Queue: enqueue(payload, ttl_seconds?, actor_id=None, organization_id)
  Queue-->>Destination: enqueue result
  Destination-->>Client: acknowledgement
    Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 
 Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
 ✅ Passed checks (2 passed)
 ✨ Finishing touches
 🧪 Generate unit tests (beta)
 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to Reviews > Disable Cache setting Knowledge base: Disabled due to  📒 Files selected for processing (2)
 🚧 Files skipped from review as they are similar to previous changes (2)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment   | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
unstract/filesystem/src/unstract/filesystem/file_storage_config.py (1)
31-32: Confirm SHARED_TEMPORARY aligns with HITL data handling.If HITL artifacts may include PII, confirm SHARED_TEMPORARY meets isolation, retention, and encryption expectations; otherwise consider a dedicated storage tier/bucket/namespace.
backend/workflow_manager/endpoint_v2/queue_utils.py (3)
28-41: Avoid mutable default for connector_settingsUse None default and initialize inside to satisfy linters and prevent shared state.
- def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: + def get_queue_inst(connector_settings: dict[str, Any] | None = None) -> UnstractQueue: + if connector_settings is None: + connector_settings = {} """Get queue connector instance based on configuration.
53-56: Avoid mutable default for connector_settings (HITL path)Same fix for get_hitl_queue_inst.
- def get_hitl_queue_inst( - backend: str, connector_settings: dict[str, Any] = {} - ) -> UnstractQueue: + def get_hitl_queue_inst( + backend: str, connector_settings: dict[str, Any] | None = None + ) -> UnstractQueue: + if connector_settings is None: + connector_settings = {}
95-109: Prefer logger.exception and preserve traceback withfrom eImproves observability and complies with linter hints.
- except ImportError as e: - logger.error( - f"HITL queue backend '{backend}' not available: {e}. " - f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured." - ) - raise UnstractQueueException( - detail=f"HITL queue backend '{backend}' not available. " - f"Please install the manual_review_v2 app or use 'redis' backend." - ) - except Exception as e: - logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}") - raise UnstractQueueException( - detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" - ) + except ImportError as e: + logger.exception( + "HITL queue backend '%s' not available: %s. Make sure 'pluggable_apps.manual_review_v2' is installed and configured.", + backend, e + ) + raise UnstractQueueException( + detail=f"HITL queue backend '{backend}' not available. " + f"Please install the manual_review_v2 app or use 'redis' backend." + ) from e + except Exception as e: + logger.exception("Failed to initialize HITL queue backend '%s': %s", backend, e) + raise UnstractQueueException( + detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" + ) from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (4)
backend/workflow_manager/endpoint_v2/destination.py(2 hunks)backend/workflow_manager/endpoint_v2/queue_utils.py(4 hunks)unstract/filesystem/src/unstract/filesystem/file_storage_config.py(1 hunks)unstract/filesystem/src/unstract/filesystem/file_storage_types.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
backend/workflow_manager/endpoint_v2/queue_utils.py
54-54: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
96-99: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
100-103: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
104-104: Do not catch blind exception: Exception
(BLE001)
105-105: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
106-108: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
107-107: Use explicit conversion flag
Replace with conversion flag
(RUF010)
141-143: Avoid specifying long messages outside the exception class
(TRY003)
147-149: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
 
🔇 Additional comments (2)
unstract/filesystem/src/unstract/filesystem/file_storage_types.py (1)
7-7: Add brief usage note or doc where this enum is documented.LGTM. Please ensure HITL_FILES is referenced in any docs/samples that enumerate FileStorageType values.
backend/workflow_manager/endpoint_v2/destination.py (1)
802-816: Verify HITL queue connector API supports enqueue_with_ttl; prefer self.organization_id
- UnstractQueue does not declare enqueue_with_ttl and the repo has no implementations — confirm pluggable HITL connectors (HybridQueue/PostgreSQLQueue) expose enqueue_with_ttl(queue_name, message, ttl_seconds=None, actor_id=None) or add the method/guard the call to avoid AttributeError.
 - Replace re-fetch of org id with self.organization_id in backend/workflow_manager/endpoint_v2/destination.py (occurrences near lines ~803 and ~864):
 - organization_id = UserContext.get_organization_identifier() + organization_id = self.organization_id
| 
               | 
          ||
| if is_hitl: | ||
| # Use HITL-specific queue backend | ||
| hitl_backend = getattr(settings, "HITL_QUEUE_BACKEND", "hybrid") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Specify the default value for this setting in the config file itself and directly use the setting here
| except ImportError as e: | ||
| logger.error( | ||
| f"HITL queue backend '{backend}' not available: {e}. " | ||
| f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured." | ||
| ) | ||
| raise UnstractQueueException( | ||
| detail=f"HITL queue backend '{backend}' not available. " | ||
| f"Please install the manual_review_v2 app or use 'redis' backend." | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}") | ||
| raise UnstractQueueException( | ||
| detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Might not be necessary to log the error here since the middleware takes care of it already in case of an error. Also in the given pattern, we print the pythonic error and we might risk showing it to the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/workflow_manager/endpoint_v2/queue_utils.py (1)
54-78: Avoid mutable default for connector_settingsUsing
{}as a default argument means every call without an explicit settings dict shares the same object, which can lead to surprising cross-call mutations if a connector modifies the dict. Please default toNoneand instantiate inside the function.- backend: str, connector_settings: dict[str, Any] = {} + backend: str, connector_settings: dict[str, Any] | None = None ) -> UnstractQueue: @@ - if backend == "redis": + connector_settings = connector_settings or {} + if backend == "redis": # Strip HITL flag to force default (non-HITL) connector path non_hitl_settings = dict(connector_settings)Based on static analysis hints.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/sample.env(1 hunks)backend/workflow_manager/endpoint_v2/queue_utils.py(3 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
backend/workflow_manager/endpoint_v2/queue_utils.py
25-25: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
28-28: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
54-54: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
99-102: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
103-106: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
107-107: Do not catch blind exception: Exception
(BLE001)
108-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
109-111: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
110-110: Use explicit conversion flag
Replace with conversion flag
(RUF010)
142-144: Avoid specifying long messages outside the exception class
(TRY003)
148-150: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
 
🔇 Additional comments (1)
backend/sample.env (1)
204-204: LGTM: HITL storage credentials sample entryThanks for wiring up the sample credentials; this keeps the sample env aligned with the new HITL_FILES storage configuration.
| if backend == "redis": | ||
| # Strip HITL flag to force default (non-HITL) connector path | ||
| non_hitl_settings = dict(connector_settings) | ||
| non_hitl_settings.pop("use_hitl_backend", None) | ||
| return QueueUtils.get_queue_inst(non_hitl_settings) | ||
| 
               | 
          ||
| # For PostgreSQL and Hybrid backends, try dynamic imports | ||
| try: | ||
| if backend == "postgresql": | ||
| connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue") | ||
| return connector_class(connector_settings) | ||
| 
               | 
          ||
| elif backend == "hybrid": | ||
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | ||
| return connector_class(connector_settings) | ||
| 
               | 
          ||
| else: | ||
| logger.warning( | ||
| f"Unknown HITL queue backend '{backend}'. " | ||
| f"Valid options: postgresql, hybrid, redis. " | ||
| f"Attempting fallback to hybrid." | ||
| ) | ||
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | ||
| return connector_class(connector_settings) | ||
| 
               | 
          
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalize HITL backend selection to avoid wrong connector path
HITL_QUEUE_BACKEND comes from environment configuration, so values like REDIS, Redis, or PostgreSQL are plausible. Because the current check is case-sensitive, anything other than exact lowercase redis/postgresql/hybrid slides into the “unknown” branch, skipping the Redis fallback and, in many deployments, raising an exception instead of enqueueing. Please normalize the backend string (e.g., normalized_backend = (backend or "").lower()) before branching so all common spellings hit the intended code path.
-        if backend == "redis":
+        normalized_backend = (backend or "").lower()
+        if normalized_backend == "redis":
             # Strip HITL flag to force default (non-HITL) connector path
             non_hitl_settings = dict(connector_settings)
             non_hitl_settings.pop("use_hitl_backend", None)
             return QueueUtils.get_queue_inst(non_hitl_settings)
 
         # For PostgreSQL and Hybrid backends, try dynamic imports
         try:
-            if backend == "postgresql":
+            if normalized_backend == "postgresql":
                 connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue")
                 return connector_class(connector_settings)
 
-            elif backend == "hybrid":
+            elif normalized_backend == "hybrid":
                 connector_class = QueueUtils._import_hitl_connector("HybridQueue")
                 return connector_class(connector_settings)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if backend == "redis": | |
| # Strip HITL flag to force default (non-HITL) connector path | |
| non_hitl_settings = dict(connector_settings) | |
| non_hitl_settings.pop("use_hitl_backend", None) | |
| return QueueUtils.get_queue_inst(non_hitl_settings) | |
| # For PostgreSQL and Hybrid backends, try dynamic imports | |
| try: | |
| if backend == "postgresql": | |
| connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue") | |
| return connector_class(connector_settings) | |
| elif backend == "hybrid": | |
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | |
| return connector_class(connector_settings) | |
| else: | |
| logger.warning( | |
| f"Unknown HITL queue backend '{backend}'. " | |
| f"Valid options: postgresql, hybrid, redis. " | |
| f"Attempting fallback to hybrid." | |
| ) | |
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | |
| return connector_class(connector_settings) | |
| normalized_backend = (backend or "").lower() | |
| if normalized_backend == "redis": | |
| # Strip HITL flag to force default (non-HITL) connector path | |
| non_hitl_settings = dict(connector_settings) | |
| non_hitl_settings.pop("use_hitl_backend", None) | |
| return QueueUtils.get_queue_inst(non_hitl_settings) | |
| # For PostgreSQL and Hybrid backends, try dynamic imports | |
| try: | |
| if normalized_backend == "postgresql": | |
| connector_class = QueueUtils._import_hitl_connector("PostgreSQLQueue") | |
| return connector_class(connector_settings) | |
| elif normalized_backend == "hybrid": | |
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | |
| return connector_class(connector_settings) | |
| else: | |
| logger.warning( | |
| f"Unknown HITL queue backend '{backend}'. " | |
| f"Valid options: postgresql, hybrid, redis. " | |
| f"Attempting fallback to hybrid." | |
| ) | |
| connector_class = QueueUtils._import_hitl_connector("HybridQueue") | |
| return connector_class(connector_settings) | 
🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/queue_utils.py around lines 73 to 97,
the branch logic uses case-sensitive backend string checks causing values like
"Redis" or "PostgreSQL" to fall through to the unknown branch; fix by
normalizing the incoming backend value first (e.g., normalized_backend =
(backend or "").lower()) and then use normalized_backend in all subsequent
comparisons and logging, ensuring the Redis/PostgreSQL/Hybrid branches and the
fallback behave correctly even when env vars differ in case.
…dling-improvements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT comments can be resolved later
| UnstractQueueException: When HITL connectors are not available | ||
| """ | ||
| # For Redis backend, use default connector | ||
| if backend == "redis": | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: @vishnuszipstack can we keep this as a constant to keep uniformity in codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…dling-improvements Signed-off-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
          
  | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/sample.env (1)
204-204: Consider adding a documentation comment.The new
HITL_FILES_FILE_STORAGE_CREDENTIALSenvironment variable lacks an explanatory comment. Adding a brief comment (similar to those for other sections like "X2Text Service" or "Prompt Studio") would improve clarity for developers.Example:
+# HITL (Human-In-The-Loop) Files Storage HITL_FILES_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'Operational note: While reusing the same MinIO instance and credentials is appropriate for this sample configuration, production deployments should consider isolating HITL file storage with separate credentials or dedicated storage to enhance security boundaries and enable independent scaling/lifecycle management.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/sample.env(1 hunks)backend/workflow_manager/endpoint_v2/destination.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
 
| # Get organization_id for proper HITL queue filtering | ||
| organization_id = UserContext.get_organization_identifier() | ||
| conn = QueueUtils.get_queue_inst( | ||
| {"use_hitl_backend": True, "organization_id": organization_id} | ||
| ) | ||
| # API deployments don't have TTL, use system actor | ||
| conn.enqueue_with_ttl( | ||
| queue_name=q_name, | ||
| message=queue_result_json, | ||
| ttl_seconds=None, # No TTL for API deployments | ||
| actor_id=None, # System-initiated enqueue | ||
| ) | ||
| logger.info( | ||
| f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}" | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Use cached self.organization_id instead of fetching again.
The organization_id is already fetched and stored in self.organization_id at line 79 during initialization. Fetching it again here is redundant and could theoretically lead to inconsistency if the user context changes between initialization and this call.
Apply this diff:
-            # Get organization_id for proper HITL queue filtering
-            organization_id = UserContext.get_organization_identifier()
             conn = QueueUtils.get_queue_inst(
-                {"use_hitl_backend": True, "organization_id": organization_id}
+                {"use_hitl_backend": True, "organization_id": self.organization_id}
             )
             # API deployments don't have TTL, use system actor
             conn.enqueue_with_ttl(
                 queue_name=q_name,
                 message=queue_result_json,
                 ttl_seconds=None,  # No TTL for API deployments
                 actor_id=None,  # System-initiated enqueue
             )
             logger.info(
-                f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}"
+                f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id}"
             )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Get organization_id for proper HITL queue filtering | |
| organization_id = UserContext.get_organization_identifier() | |
| conn = QueueUtils.get_queue_inst( | |
| {"use_hitl_backend": True, "organization_id": organization_id} | |
| ) | |
| # API deployments don't have TTL, use system actor | |
| conn.enqueue_with_ttl( | |
| queue_name=q_name, | |
| message=queue_result_json, | |
| ttl_seconds=None, # No TTL for API deployments | |
| actor_id=None, # System-initiated enqueue | |
| ) | |
| logger.info( | |
| f"Pushed {file_name} to queue {q_name} with organization_id {organization_id}" | |
| ) | |
| conn = QueueUtils.get_queue_inst( | |
| {"use_hitl_backend": True, "organization_id": self.organization_id} | |
| ) | |
| # API deployments don't have TTL, use system actor | |
| conn.enqueue_with_ttl( | |
| queue_name=q_name, | |
| message=queue_result_json, | |
| ttl_seconds=None, # No TTL for API deployments | |
| actor_id=None, # System-initiated enqueue | |
| ) | |
| logger.info( | |
| f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id}" | |
| ) | 
🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/destination.py around lines 873 to 887,
the code redundantly calls UserContext.get_organization_identifier() even though
self.organization_id was set at initialization; replace that call with using
self.organization_id (ensure it is used directly when building the
QueueUtils.get_queue_inst call and in the log message) so the method uses the
cached organization identifier rather than re-fetching from UserContext.
| # Get organization_id for proper HITL queue filtering | ||
| organization_id = UserContext.get_organization_identifier() | ||
| conn = QueueUtils.get_queue_inst( | ||
| {"use_hitl_backend": True, "organization_id": organization_id} | ||
| ) | ||
| # Use TTL from workflow settings, system actor for workflow enqueue | ||
| ttl_seconds = queue_result_obj.ttl_seconds | ||
| 
               | 
          ||
| conn.enqueue_with_ttl( | ||
| queue_name=q_name, message=queue_result_json, ttl_seconds=ttl_seconds | ||
| queue_name=q_name, | ||
| message=queue_result_json, | ||
| ttl_seconds=ttl_seconds, | ||
| actor_id=None, # System-initiated enqueue | ||
| ) | ||
| logger.info( | ||
| f"Pushed {file_name} to queue {q_name} with organization_id {organization_id} and TTL {ttl_seconds}" | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Use cached self.organization_id instead of fetching again.
Same issue as in the API path above. The organization_id is already available as self.organization_id from initialization (line 79) and should be reused here.
Apply this diff:
-            # Get organization_id for proper HITL queue filtering
-            organization_id = UserContext.get_organization_identifier()
             conn = QueueUtils.get_queue_inst(
-                {"use_hitl_backend": True, "organization_id": organization_id}
+                {"use_hitl_backend": True, "organization_id": self.organization_id}
             )
             # Use TTL from workflow settings, system actor for workflow enqueue
             ttl_seconds = queue_result_obj.ttl_seconds
             conn.enqueue_with_ttl(
                 queue_name=q_name,
                 message=queue_result_json,
                 ttl_seconds=ttl_seconds,
                 actor_id=None,  # System-initiated enqueue
             )
             logger.info(
-                f"Pushed {file_name} to queue {q_name} with organization_id {organization_id} and TTL {ttl_seconds}"
+                f"Pushed {file_name} to queue {q_name} with organization_id {self.organization_id} and TTL {ttl_seconds}"
             )🤖 Prompt for AI Agents
In backend/workflow_manager/endpoint_v2/destination.py around lines 937 to 952,
the code re-fetches the organization id via
UserContext.get_organization_identifier() even though self.organization_id was
set at initialization; replace the local call and assign organization_id =
self.organization_id (or use self.organization_id directly) when building the
QueueUtils.get_queue_inst call and in the log message so the cached value is
reused and the extra context lookup is removed.
          Test ResultsSummary
 Runner Tests - Full Report
 SDK1 Tests - Full Report
  | 
    
          
 | 
    



What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
Screenshots
Checklist
I have read and understood the Contribution Guidelines.