From a780b583d8eea6b1517bb8f9b1b7c032aec6c399 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 9 Oct 2025 21:55:44 +0530 Subject: [PATCH] UN-2865 [FIX] Remove premature COMPLETED status update in general worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fix addresses a critical bug where the general worker incorrectly marked workflow executions as COMPLETED immediately after orchestrating async file processing, while files were still being processed. Changes: - Removed WorkflowExecutionStatusUpdate that set status to COMPLETED - Removed incorrect execution_time update (only orchestration time) - Removed incorrect total_files calculation - Updated comments to clarify orchestration vs execution completion - Updated logging to reflect async orchestration behavior The callback worker now properly handles setting the final COMPLETED or ERROR status after all files finish processing, matching the pattern used by the API deployment worker. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- workers/general/tasks.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/workers/general/tasks.py b/workers/general/tasks.py index bbc0d7a1c8..a34d60cbfb 100644 --- a/workers/general/tasks.py +++ b/workers/general/tasks.py @@ -17,7 +17,6 @@ from shared.data.models import ( CallbackTaskData, WorkerTaskResponse, - WorkflowExecutionStatusUpdate, ) from shared.enums.status_enums import PipelineStatus from shared.enums.task_enums import TaskName @@ -270,24 +269,14 @@ def async_execute_bin_general( **kwargs, ) - # Calculate execution time + # Calculate orchestration time (not total execution time - callback worker calculates that) execution_time = execution_result.get("execution_time", 0) - # Update execution status to completed - # Only include total_files if we have files to avoid overwriting with 0 - update_request = WorkflowExecutionStatusUpdate( - execution_id=execution_id, - status=ExecutionStatus.COMPLETED.value, - execution_time=execution_time, - total_files=len(hash_values_of_files) if hash_values_of_files else None, - ) - - api_client.update_workflow_execution_status(**update_request.to_dict()) - + # Status remains EXECUTING - callback worker will set COMPLETED/ERROR after all files finish # Cache cleanup handled by callback worker logger.info( - f"Successfully completed general workflow execution {execution_id}" + f"Successfully orchestrated general workflow execution {execution_id} (files processing asynchronously)" ) response = WorkerTaskResponse.success_response(