From 318bb8e19c53ccd0e61fdc3c6dd7a8b82b0f01da Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:16:26 -0700 Subject: [PATCH 01/11] just call deserialized during dequeue --- dbos/queue.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/dbos/queue.go b/dbos/queue.go index f323887..5897de3 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -1,10 +1,7 @@ package dbos import ( - "bytes" "context" - "encoding/base64" - "encoding/gob" "log/slog" "math" "math/rand" @@ -227,18 +224,12 @@ func (qr *queueRunner) run(ctx *dbosContext) { continue } - // Deserialize input + // Deserialize input using the centralized deserialize function var input any if len(workflow.input) > 0 { - inputBytes, err := base64.StdEncoding.DecodeString(workflow.input) + input, err = deserialize(&workflow.input) if err != nil { - qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err) - continue - } - buf := bytes.NewBuffer(inputBytes) - dec := gob.NewDecoder(buf) - if err := dec.Decode(&input); err != nil { - qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err) + qr.logger.Error("failed to deserialize input for workflow", "workflow_id", workflow.id, "error", err) continue } } From 0259dd66d29ae49034586f542b75d5731ebd7dec Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:17:16 -0700 Subject: [PATCH 02/11] decode during recovery (instead of getting it decoded from list workflows --- dbos/recovery.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/dbos/recovery.go b/dbos/recovery.go index ba51a6d..48277c7 100644 --- a/dbos/recovery.go +++ b/dbos/recovery.go @@ -1,9 +1,5 @@ package dbos -import ( - "strings" -) - func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]WorkflowHandle[any], error) { workflowHandles := make([]WorkflowHandle[any], 0) // List pending workflows for the executors @@ -18,11 +14,21 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow } for _, workflow := range pendingWorkflows { - if inputStr, ok := workflow.Input.(string); ok { - if strings.Contains(inputStr, "Failed to decode") { - ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name) + // Deserialize the workflow input + var decodedInput any + if workflow.Input != nil { + inputStr, ok := workflow.Input.(string) + if !ok { + ctx.logger.Warn("Skipping workflow recovery: input is not an encoded string", "workflow_id", workflow.ID, "name", workflow.Name, "input_type", workflow.Input) continue } + if inputStr != "" { + decodedInput, err = deserialize(&inputStr) + if err != nil { + ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name, "error", err) + continue + } + } } if workflow.QueueName != "" { @@ -59,7 +65,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow WithWorkflowID(workflow.ID), } // Create a workflow context from the executor context - handle, err := registeredWorkflow.wrappedFunction(ctx, workflow.Input, opts...) + handle, err := registeredWorkflow.wrappedFunction(ctx, decodedInput, opts...) if err != nil { return nil, err } From bf35e785987928026d74cf546652c9167cc157a3 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:29:35 -0700 Subject: [PATCH 03/11] lift encoding outside of the system db --- dbos/system_database.go | 126 ++++++++++++++++-------------- dbos/workflow.go | 165 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 220 insertions(+), 71 deletions(-) diff --git a/dbos/system_database.go b/dbos/system_database.go index ac1abc4..ea97e92 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -440,9 +440,10 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt timeoutMs = &millis } - inputString, err := serialize(input.status.Input) - if err != nil { - return nil, fmt.Errorf("failed to serialize input: %w", err) + // Input should already be serialized by caller + inputString, ok := input.status.Input.(string) + if !ok && input.status.Input != nil { + return nil, fmt.Errorf("workflow input must be a pre-encoded string, got %T", input.status.Input) } // Our DB works with NULL values @@ -791,18 +792,15 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.Error = errors.New(*errorStr) } - wf.Output, err = deserialize(outputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize output: %w", err) + // Return output as encoded string (stored as any) + if outputString != nil { + wf.Output = *outputString } } - // Handle input only if loadInput is true - if input.loadInput { - wf.Input, err = deserialize(inputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize input: %w", err) - } + // Return input as encoded string (stored as any) + if input.loadInput && inputString != nil { + wf.Input = *inputString } workflows = append(workflows, wf) @@ -830,9 +828,10 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 in ($7, $8))`, pgx.Identifier{s.schema}.Sanitize()) - outputString, err := serialize(input.output) - if err != nil { - return fmt.Errorf("failed to serialize output: %w", err) + // Output should already be serialized by caller + outputString, ok := input.output.(string) + if !ok && input.output != nil { + return fmt.Errorf("workflow output must be a pre-encoded string, got %T", input.output) } var errorStr string @@ -840,6 +839,7 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO errorStr = input.err.Error() } + var err error if input.tx != nil { _, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError) } else { @@ -1105,9 +1105,10 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st recovery_attempts ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, pgx.Identifier{s.schema}.Sanitize()) - inputString, err := serialize(originalWorkflow.Input) - if err != nil { - return "", fmt.Errorf("failed to serialize input: %w", err) + // Input should already be serialized in originalWorkflow.Input + inputString, ok := originalWorkflow.Input.(string) + if !ok && originalWorkflow.Input != nil { + return "", fmt.Errorf("workflow input must be a pre-encoded string, got %T", originalWorkflow.Input) } // Marshal authenticated roles (slice of strings) to JSON for TEXT column @@ -1179,10 +1180,10 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any return nil, fmt.Errorf("failed to query workflow status: %w", err) } - // Deserialize output from TEXT to bytes then from bytes to R using gob - output, err := deserialize(outputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize output: %w", err) + // Return output as encoded string + var output any + if outputString != nil { + output = *outputString } switch status { @@ -1219,11 +1220,13 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation errorString = &e } - outputString, err := serialize(input.output) - if err != nil { - return fmt.Errorf("failed to serialize output: %w", err) + // Output should already be serialized by caller + outputString, ok := input.output.(string) + if !ok && input.output != nil { + return fmt.Errorf("step output must be a pre-encoded string, got %T", input.output) } + var err error if input.tx != nil { _, err = input.tx.Exec(ctx, query, input.workflowID, @@ -1431,9 +1434,10 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio return nil, newUnexpectedStepError(input.workflowID, input.stepID, input.stepName, recordedFunctionName) } - output, err := deserialize(outputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize output: %w", err) + // Return output as encoded string + var output any + if outputString != nil { + output = *outputString } var recordedError error @@ -1485,13 +1489,9 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInpu return nil, fmt.Errorf("failed to scan step row: %w", err) } - // Deserialize output if present and loadOutput is true + // Return output as encoded string if loadOutput is true if input.loadOutput && outputString != nil { - output, err := deserialize(outputString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize output: %w", err) - } - step.Output = output + step.Output = *outputString } // Convert error string to error if present @@ -1564,10 +1564,20 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err return 0, fmt.Errorf("no recorded end time for recorded sleep operation") } + // Deserialize the recorded end time + encodedStr, ok := recordedResult.output.(string) + if !ok { + return 0, fmt.Errorf("recorded output must be encoded string, got %T", recordedResult.output) + } + decodedOutput, err := deserialize(&encodedStr) + if err != nil { + return 0, fmt.Errorf("failed to deserialize sleep end time: %w", err) + } + // The output should be a time.Time representing the end time - endTimeInterface, ok := recordedResult.output.(time.Time) + endTimeInterface, ok := decodedOutput.(time.Time) if !ok { - return 0, fmt.Errorf("recorded output is not a time.Time: %T", recordedResult.output) + return 0, fmt.Errorf("decoded output is not a time.Time: %T", decodedOutput) } endTime = endTimeInterface @@ -1578,12 +1588,18 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err // First execution: calculate and record the end time endTime = time.Now().Add(input.duration) + // Serialize the end time before recording + encodedEndTime, serErr := serialize(endTime) + if serErr != nil { + return 0, fmt.Errorf("failed to serialize sleep end time: %w", serErr) + } + // Record the operation result with the calculated end time recordInput := recordOperationResultDBInput{ workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, - output: endTime, + output: encodedEndTime, err: nil, } @@ -1783,10 +1799,10 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { topic = input.Topic } - // Serialize the message. It must have been registered with encoding/gob by the user if not a basic type. - messageString, err := serialize(input.Message) - if err != nil { - return fmt.Errorf("failed to serialize message: %w", err) + // Message should already be serialized by caller + messageString, ok := input.Message.(string) + if !ok && input.Message != nil { + return fmt.Errorf("message must be a pre-encoded string, got %T", input.Message) } insertQuery := fmt.Sprintf(`INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize()) @@ -1947,16 +1963,13 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { } } - // Deserialize the message + // Return message as encoded string var message any - if messageString != nil { // nil message can happen on the timeout path only - message, err = deserialize(messageString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize message: %w", err) - } + if messageString != nil { + message = *messageString } - // Record the operation result + // Record the operation result (with encoded message string) recordInput := recordOperationResultDBInput{ workflowID: destinationID, stepID: stepID, @@ -2018,10 +2031,10 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error return nil } - // Serialize the message. It must have been registered with encoding/gob by the user if not a basic type. - messageString, err := serialize(input.Message) - if err != nil { - return fmt.Errorf("failed to serialize message: %w", err) + // Message should already be serialized by caller + messageString, ok := input.Message.(string) + if !ok && input.Message != nil { + return fmt.Errorf("event value must be a pre-encoded string, got %T", input.Message) } // Insert or update the event using UPSERT @@ -2160,16 +2173,13 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) } } - // Deserialize the value if it exists + // Return value as encoded string var value any if valueString != nil { - value, err = deserialize(valueString) - if err != nil { - return nil, fmt.Errorf("failed to deserialize event value: %w", err) - } + value = *valueString } - // Record the operation result if this is called within a workflow + // Record the operation result if this is called within a workflow (with encoded value string) if isInWorkflow { recordInput := recordOperationResultDBInput{ workflowID: wfState.workflowID, diff --git a/dbos/workflow.go b/dbos/workflow.go index 5117933..5907df7 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -249,9 +249,23 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) defer cancel() } - result, err := retryWithResult(ctx, func() (any, error) { + encodedResult, err := retryWithResult(ctx, func() (any, error) { return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) + + // Deserialize the result + var result any + if encodedResult != nil { + encodedStr, ok := encodedResult.(string) + if !ok { + return *new(R), newWorkflowUnexpectedResultType(h.workflowID, "string (encoded)", fmt.Sprintf("%T", encodedResult)) + } + result, err = deserialize(&encodedStr) + if err != nil { + return *new(R), fmt.Errorf("failed to deserialize workflow result: %w", err) + } + } + if result != nil { typedResult, ok := result.(R) if !ok { @@ -806,6 +820,13 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt if params.priority > uint(math.MaxInt) { return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt) } + + // Serialize input before storing in workflow status + encodedInput, serErr := serialize(input) + if serErr != nil { + return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to serialize workflow input: %w", serErr)) + } + workflowStatus := WorkflowStatus{ Name: params.workflowName, ApplicationVersion: params.applicationVersion, @@ -815,7 +836,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt CreatedAt: time.Now(), Deadline: deadline, Timeout: timeout, - Input: input, + Input: encodedInput, ApplicationID: c.GetApplicationID(), QueueName: params.queueName, DeduplicationID: params.deduplicationID, @@ -955,12 +976,21 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt status = WorkflowStatusCancelled } + // Serialize the output before recording + encodedOutput, serErr := serialize(result) + if serErr != nil { + c.logger.Error("Failed to serialize workflow output", "workflow_id", workflowID, "error", serErr) + outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("failed to serialize output: %w", serErr)} + close(outcomeChan) + return + } + recordErr := retry(c, func() error { return c.systemDB.updateWorkflowOutcome(uncancellableCtx, updateWorkflowOutcomeDBInput{ workflowID: workflowID, status: status, err: err, - output: result, + output: encodedOutput, }) }, withRetrierLogger(c.logger)) if recordErr != nil { @@ -1178,7 +1208,19 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("checking operation execution: %w", err)) } if recordedOutput != nil { - return recordedOutput.output, recordedOutput.err + // Deserialize the recorded output + var decodedOutput any + if recordedOutput.output != nil { + encodedStr, ok := recordedOutput.output.(string) + if !ok { + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("recorded output must be encoded string, got %T", recordedOutput.output)) + } + decodedOutput, err = deserialize(&encodedStr) + if err != nil { + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("failed to deserialize recorded output: %w", err)) + } + } + return decodedOutput, recordedOutput.err } // Spawn a child DBOSContext with the step state @@ -1228,13 +1270,19 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) } } + // Serialize step output before recording + encodedStepOutput, serErr := serialize(stepOutput) + if serErr != nil { + return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("failed to serialize step output: %w", serErr)) + } + // Record the final result dbInput := recordOperationResultDBInput{ workflowID: stepState.workflowID, stepName: stepOpts.stepName, stepID: stepState.stepID, err: stepError, - output: stepOutput, + output: encodedStepOutput, } recErr := retry(c, func() error { return c.systemDB.recordOperationResult(uncancellableCtx, dbInput) @@ -1251,10 +1299,16 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) /****************************************/ func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error { + // Serialize the message before sending + encodedMessage, err := serialize(message) + if err != nil { + return fmt.Errorf("failed to serialize message: %w", err) + } + return retry(c, func() error { return c.systemDB.send(c, WorkflowSendInput{ DestinationID: destinationID, - Message: message, + Message: encodedMessage, Topic: topic, }) }, withRetrierLogger(c.logger)) @@ -1292,9 +1346,22 @@ func (c *dbosContext) Recv(_ DBOSContext, topic string, timeout time.Duration) ( Topic: topic, Timeout: timeout, } - return retryWithResult(c, func() (any, error) { + encodedMsg, err := retryWithResult(c, func() (any, error) { return c.systemDB.recv(c, input) }, withRetrierLogger(c.logger)) + if err != nil { + return nil, err + } + + // Deserialize the received message + if encodedMsg != nil { + encodedStr, ok := encodedMsg.(string) + if !ok { + return nil, fmt.Errorf("received message must be encoded string, got %T", encodedMsg) + } + return deserialize(&encodedStr) + } + return nil, nil } // Recv receives a message sent to this workflow with type safety. @@ -1332,10 +1399,16 @@ func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error } func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error { + // Serialize the event value before storing + encodedMessage, err := serialize(message) + if err != nil { + return fmt.Errorf("failed to serialize event value: %w", err) + } + return retry(c, func() error { return c.systemDB.setEvent(c, WorkflowSetEventInput{ Key: key, - Message: message, + Message: encodedMessage, }) }, withRetrierLogger(c.logger)) } @@ -1375,9 +1448,22 @@ func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID, key string, time Key: key, Timeout: timeout, } - return retryWithResult(c, func() (any, error) { + encodedValue, err := retryWithResult(c, func() (any, error) { return c.systemDB.getEvent(c, input) }, withRetrierLogger(c.logger)) + if err != nil { + return nil, err + } + + // Deserialize the event value + if encodedValue != nil { + encodedStr, ok := encodedValue.(string) + if !ok { + return nil, fmt.Errorf("event value must be encoded string, got %T", encodedValue) + } + return deserialize(&encodedStr) + } + return nil, nil } // GetEvent retrieves a key-value event from a target workflow with type safety. @@ -1940,15 +2026,46 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) } // Call the context method to list workflows + var workflows []WorkflowStatus + var err error workflowState, ok := c.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { - return RunAsStep(c, func(ctx context.Context) ([]WorkflowStatus, error) { + workflows, err = RunAsStep(c, func(ctx context.Context) ([]WorkflowStatus, error) { return c.systemDB.listWorkflows(ctx, dbInput) }, WithStepName("DBOS.listWorkflows")) } else { - return c.systemDB.listWorkflows(c, dbInput) + workflows, err = c.systemDB.listWorkflows(c, dbInput) } + if err != nil { + return nil, err + } + + // Deserialize Input and Output fields if they were loaded + if params.loadInput || params.loadOutput { + for i := range workflows { + if params.loadInput && workflows[i].Input != nil { + encodedStr, ok := workflows[i].Input.(string) + if ok && encodedStr != "" { + workflows[i].Input, err = deserialize(&encodedStr) + if err != nil { + return nil, fmt.Errorf("failed to deserialize workflow input for %s: %w", workflows[i].ID, err) + } + } + } + if params.loadOutput && workflows[i].Output != nil { + encodedStr, ok := workflows[i].Output.(string) + if ok && encodedStr != "" { + workflows[i].Output, err = deserialize(&encodedStr) + if err != nil { + return nil, fmt.Errorf("failed to deserialize workflow output for %s: %w", workflows[i].ID, err) + } + } + } + } + } + + return workflows, nil } // ListWorkflows retrieves a list of workflows based on the provided filters. @@ -2014,15 +2131,37 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step loadOutput: loadOutput, } + var steps []StepInfo + var err error workflowState, ok := c.Value(workflowStateKey).(*workflowState) isWithinWorkflow := ok && workflowState != nil if isWithinWorkflow { - return RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) { + steps, err = RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) { return c.systemDB.getWorkflowSteps(ctx, getWorkflowStepsInput) }, WithStepName("DBOS.getWorkflowSteps")) } else { - return c.systemDB.getWorkflowSteps(c, getWorkflowStepsInput) + steps, err = c.systemDB.getWorkflowSteps(c, getWorkflowStepsInput) + } + if err != nil { + return nil, err } + + // Deserialize Output fields if they were loaded + if loadOutput { + for i := range steps { + if steps[i].Output != nil { + encodedStr, ok := steps[i].Output.(string) + if ok && encodedStr != "" { + steps[i].Output, err = deserialize(&encodedStr) + if err != nil { + return nil, fmt.Errorf("failed to deserialize step output for step %d: %w", steps[i].StepID, err) + } + } + } + } + } + + return steps, nil } // GetWorkflowSteps retrieves the execution steps of a workflow. From 9852279e90c0f74a92da5f7423ac2c51ce56d252 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:33:29 -0700 Subject: [PATCH 04/11] encode in enqueue, client --- dbos/client.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dbos/client.go b/dbos/client.go index c04130b..dc71316 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -147,6 +147,13 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu if params.priority > uint(math.MaxInt) { return nil, fmt.Errorf("priority %d exceeds maximum allowed value %d", params.priority, math.MaxInt) } + + // Serialize input before storing in workflow status + encodedInput, err := serialize(params.workflowInput) + if err != nil { + return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to serialize workflow input: %w", err)) + } + status := WorkflowStatus{ Name: params.workflowName, ApplicationVersion: params.applicationVersion, @@ -155,7 +162,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu CreatedAt: time.Now(), Deadline: deadline, Timeout: params.workflowTimeout, - Input: params.workflowInput, + Input: encodedInput, QueueName: queueName, DeduplicationID: params.deduplicationID, Priority: int(params.priority), From f945e3b02d2e7e571daecd6a0dea428220a48a7a Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:44:52 -0700 Subject: [PATCH 05/11] fix --- dbos/client.go | 2 +- dbos/workflow.go | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/dbos/client.go b/dbos/client.go index dc71316..a8c8fa7 100644 --- a/dbos/client.go +++ b/dbos/client.go @@ -151,7 +151,7 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu // Serialize input before storing in workflow status encodedInput, err := serialize(params.workflowInput) if err != nil { - return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to serialize workflow input: %w", err)) + return nil, fmt.Errorf("failed to serialize workflow input: %w", err) } status := WorkflowStatus{ diff --git a/dbos/workflow.go b/dbos/workflow.go index 5907df7..340d0cd 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -253,16 +253,17 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) return h.dbosContext.(*dbosContext).systemDB.awaitWorkflowResult(ctx, h.workflowID) }, withRetrierLogger(h.dbosContext.(*dbosContext).logger)) - // Deserialize the result + // Deserialize the result (but preserve any error from awaitWorkflowResult) var result any if encodedResult != nil { encodedStr, ok := encodedResult.(string) if !ok { return *new(R), newWorkflowUnexpectedResultType(h.workflowID, "string (encoded)", fmt.Sprintf("%T", encodedResult)) } - result, err = deserialize(&encodedStr) - if err != nil { - return *new(R), fmt.Errorf("failed to deserialize workflow result: %w", err) + var deserErr error + result, deserErr = deserialize(&encodedStr) + if deserErr != nil { + return *new(R), fmt.Errorf("failed to deserialize workflow result: %w", deserErr) } } From ecad764893ce4f2a7eae4c48ecfd2419ebc771bc Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 14:57:39 -0700 Subject: [PATCH 06/11] decode on the concurrent execution fallback path --- dbos/workflow.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 340d0cd..0156bf6 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -959,9 +959,25 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt // Handle DBOS ID conflict errors by waiting workflow result if errors.Is(err, &DBOSError{Code: ConflictingIDError}) { c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID) - result, err = retryWithResult(c, func() (any, error) { + var encodedResult any + encodedResult, err = retryWithResult(c, func() (any, error) { return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID) }, withRetrierLogger(c.logger)) + var deserErr error + encodedResultString, ok := encodedResult.(string) + if !ok { + c.logger.Error("Unexpected result type when awaiting workflow result after ID conflict", "workflow_id", workflowID, "type", fmt.Sprintf("%T", encodedResult)) + outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("unexpected result type when awaiting workflow result after ID conflict: expected string, got %T", encodedResult)} + close(outcomeChan) + return + } + result, deserErr = deserialize(&encodedResultString) + if deserErr != nil { + c.logger.Error("Failed to deserialize workflow result after ID conflict", "workflow_id", workflowID, "error", deserErr) + outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("failed to deserialize workflow result after ID conflict: %w", deserErr)} + close(outcomeChan) + return + } } else { status := WorkflowStatusSuccess From 16f6aea885a401b8fe3e661789bb871d81718e25 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 16:49:44 -0700 Subject: [PATCH 07/11] system db expects pointers to string, correctly set in serialize() --- dbos/serialization.go | 34 +++++++---- dbos/system_database.go | 129 +++++++++++----------------------------- dbos/workflow.go | 64 +++++++++----------- 3 files changed, 85 insertions(+), 142 deletions(-) diff --git a/dbos/serialization.go b/dbos/serialization.go index c2e8814..7ec90bc 100644 --- a/dbos/serialization.go +++ b/dbos/serialization.go @@ -9,21 +9,33 @@ import ( "strings" ) -func serialize(data any) (string, error) { - var inputBytes []byte - if data != nil { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(&data); err != nil { - return "", fmt.Errorf("failed to encode data: %w", err) - } - inputBytes = buf.Bytes() +func serialize(data any) (*string, error) { + // Handle nil values specially - return nil pointer which will be stored as NULL in DB + if data == nil { + return nil, nil + } + + // Handle empty string specially - return pointer to empty string which will be stored as "" in DB + if str, ok := data.(string); ok && str == "" { + return &str, nil } - return base64.StdEncoding.EncodeToString(inputBytes), nil + + // Register the type with gob to avoid interface{} issues + safeGobRegister(data, nil) + + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(data); err != nil { + return nil, fmt.Errorf("failed to encode data: %w", err) + } + inputBytes := buf.Bytes() + + encoded := base64.StdEncoding.EncodeToString(inputBytes) + return &encoded, nil } func deserialize(data *string) (any, error) { - if data == nil || *data == "" { + if data == nil { return nil, nil } diff --git a/dbos/system_database.go b/dbos/system_database.go index ea97e92..9a41c94 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -55,7 +55,7 @@ type systemDatabase interface { // Communication (special steps) send(ctx context.Context, input WorkflowSendInput) error - recv(ctx context.Context, input recvInput) (any, error) + recv(ctx context.Context, input recvInput) (*string, error) setEvent(ctx context.Context, input WorkflowSetEventInput) error getEvent(ctx context.Context, input getEventInput) (any, error) @@ -440,12 +440,6 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt timeoutMs = &millis } - // Input should already be serialized by caller - inputString, ok := input.status.Input.(string) - if !ok && input.status.Input != nil { - return nil, fmt.Errorf("workflow input must be a pre-encoded string, got %T", input.status.Input) - } - // Our DB works with NULL values var applicationVersion *string if len(input.status.ApplicationVersion) > 0 { @@ -517,7 +511,7 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt updatedAt.UnixMilli(), timeoutMs, deadline, - inputString, + input.status.Input, // encoded input deduplicationID, input.status.Priority, WorkflowStatusEnqueued, @@ -792,15 +786,13 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( wf.Error = errors.New(*errorStr) } - // Return output as encoded string (stored as any) - if outputString != nil { - wf.Output = *outputString - } + // Return output as encoded *string + wf.Output = outputString } - // Return input as encoded string (stored as any) - if input.loadInput && inputString != nil { - wf.Input = *inputString + // Return input as encoded *string + if input.loadInput { + wf.Input = inputString } workflows = append(workflows, wf) @@ -816,7 +808,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ( type updateWorkflowOutcomeDBInput struct { workflowID string status WorkflowStatusType - output any + output *string err error tx pgx.Tx } @@ -828,12 +820,6 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 in ($7, $8))`, pgx.Identifier{s.schema}.Sanitize()) - // Output should already be serialized by caller - outputString, ok := input.output.(string) - if !ok && input.output != nil { - return fmt.Errorf("workflow output must be a pre-encoded string, got %T", input.output) - } - var errorStr string if input.err != nil { errorStr = input.err.Error() @@ -841,9 +827,9 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO var err error if input.tx != nil { - _, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError) + _, err = input.tx.Exec(ctx, query, input.status, input.output, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError) } else { - _, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError) + _, err = s.pool.Exec(ctx, query, input.status, input.output, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusSuccess, WorkflowStatusError) } if err != nil { @@ -1105,12 +1091,6 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st recovery_attempts ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)`, pgx.Identifier{s.schema}.Sanitize()) - // Input should already be serialized in originalWorkflow.Input - inputString, ok := originalWorkflow.Input.(string) - if !ok && originalWorkflow.Input != nil { - return "", fmt.Errorf("workflow input must be a pre-encoded string, got %T", originalWorkflow.Input) - } - // Marshal authenticated roles (slice of strings) to JSON for TEXT column authenticatedRoles, err := json.Marshal(originalWorkflow.AuthenticatedRoles) @@ -1128,7 +1108,7 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st &appVersion, originalWorkflow.ApplicationID, _DBOS_INTERNAL_QUEUE_NAME, - inputString, + originalWorkflow.Input, // encoded input time.Now().UnixMilli(), time.Now().UnixMilli(), 0) @@ -1204,7 +1184,7 @@ type recordOperationResultDBInput struct { workflowID string stepID int stepName string - output any + output *string err error tx pgx.Tx } @@ -1220,18 +1200,12 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation errorString = &e } - // Output should already be serialized by caller - outputString, ok := input.output.(string) - if !ok && input.output != nil { - return fmt.Errorf("step output must be a pre-encoded string, got %T", input.output) - } - var err error if input.tx != nil { _, err = input.tx.Exec(ctx, query, input.workflowID, input.stepID, - outputString, + input.output, errorString, input.stepName, ) @@ -1239,7 +1213,7 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation _, err = s.pool.Exec(ctx, query, input.workflowID, input.stepID, - outputString, + input.output, errorString, input.stepName, ) @@ -1329,7 +1303,7 @@ type recordChildGetResultDBInput struct { parentWorkflowID string childWorkflowID string stepID int - output string + output *string err error } @@ -1364,7 +1338,7 @@ func (s *sysDB) recordChildGetResult(ctx context.Context, input recordChildGetRe /*******************************/ type recordedResult struct { - output any + output *string err error } @@ -1434,18 +1408,12 @@ func (s *sysDB) checkOperationExecution(ctx context.Context, input checkOperatio return nil, newUnexpectedStepError(input.workflowID, input.stepID, input.stepName, recordedFunctionName) } - // Return output as encoded string - var output any - if outputString != nil { - output = *outputString - } - var recordedError error if errorStr != nil && *errorStr != "" { recordedError = errors.New(*errorStr) } result := &recordedResult{ - output: output, + output: outputString, err: recordedError, } return result, nil @@ -1490,8 +1458,8 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInpu } // Return output as encoded string if loadOutput is true - if input.loadOutput && outputString != nil { - step.Output = *outputString + if input.loadOutput { + step.Output = outputString } // Convert error string to error if present @@ -1565,11 +1533,7 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err } // Deserialize the recorded end time - encodedStr, ok := recordedResult.output.(string) - if !ok { - return 0, fmt.Errorf("recorded output must be encoded string, got %T", recordedResult.output) - } - decodedOutput, err := deserialize(&encodedStr) + decodedOutput, err := deserialize(recordedResult.output) if err != nil { return 0, fmt.Errorf("failed to deserialize sleep end time: %w", err) } @@ -1746,7 +1710,7 @@ const _DBOS_NULL_TOPIC = "__null__topic__" type WorkflowSendInput struct { DestinationID string - Message any + Message *string Topic string } @@ -1799,14 +1763,8 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { topic = input.Topic } - // Message should already be serialized by caller - messageString, ok := input.Message.(string) - if !ok && input.Message != nil { - return fmt.Errorf("message must be a pre-encoded string, got %T", input.Message) - } - insertQuery := fmt.Sprintf(`INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3)`, pgx.Identifier{s.schema}.Sanitize()) - _, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, messageString) + _, err = tx.Exec(ctx, insertQuery, input.DestinationID, topic, input.Message) if err != nil { // Check for foreign key violation (destination workflow doesn't exist) if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_FOREIGN_KEY_VIOLATION { @@ -1841,7 +1799,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error { } // Recv is a special type of step that receives a message destined for a given workflow -func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { +func (s *sysDB) recv(ctx context.Context, input recvInput) (*string, error) { functionName := "DBOS.recv" // Get workflow state from context @@ -1901,7 +1859,7 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { err = s.pool.QueryRow(ctx, query, destinationID, topic).Scan(&exists) if err != nil { cond.L.Unlock() - return false, fmt.Errorf("failed to check message: %w", err) + return nil, fmt.Errorf("failed to check message: %w", err) } if !exists { done := make(chan struct{}) @@ -1955,26 +1913,17 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { var messageString *string err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&messageString) if err != nil { - if err == pgx.ErrNoRows { - // No message found, record nil result - messageString = nil - } else { + if err != pgx.ErrNoRows { return nil, fmt.Errorf("failed to consume message: %w", err) } } - // Return message as encoded string - var message any - if messageString != nil { - message = *messageString - } - // Record the operation result (with encoded message string) recordInput := recordOperationResultDBInput{ workflowID: destinationID, stepID: stepID, stepName: functionName, - output: message, + output: messageString, tx: tx, } err = s.recordOperationResult(ctx, recordInput) @@ -1986,12 +1935,13 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) { return nil, fmt.Errorf("failed to commit transaction: %w", err) } - return message, nil + // Return the message string pointer + return messageString, nil } type WorkflowSetEventInput struct { Key string - Message any + Message *string } func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error { @@ -2031,19 +1981,13 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error return nil } - // Message should already be serialized by caller - messageString, ok := input.Message.(string) - if !ok && input.Message != nil { - return fmt.Errorf("event value must be a pre-encoded string, got %T", input.Message) - } - // Insert or update the event using UPSERT insertQuery := fmt.Sprintf(`INSERT INTO %s.workflow_events (workflow_uuid, key, value) VALUES ($1, $2, $3) ON CONFLICT (workflow_uuid, key) DO UPDATE SET value = EXCLUDED.value`, pgx.Identifier{s.schema}.Sanitize()) - _, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, messageString) + _, err = tx.Exec(ctx, insertQuery, wfState.workflowID, input.Key, input.Message) if err != nil { return fmt.Errorf("failed to insert/update workflow event: %w", err) } @@ -2173,19 +2117,13 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) } } - // Return value as encoded string - var value any - if valueString != nil { - value = *valueString - } - - // Record the operation result if this is called within a workflow (with encoded value string) + // Record the operation result if this is called within a workflow if isInWorkflow { recordInput := recordOperationResultDBInput{ workflowID: wfState.workflowID, stepID: stepID, stepName: functionName, - output: value, + output: valueString, err: nil, } @@ -2195,7 +2133,8 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) } } - return value, nil + // Return the value string pointer + return valueString, nil } /*******************************/ diff --git a/dbos/workflow.go b/dbos/workflow.go index 0156bf6..2c3f857 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1228,11 +1228,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // Deserialize the recorded output var decodedOutput any if recordedOutput.output != nil { - encodedStr, ok := recordedOutput.output.(string) - if !ok { - return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("recorded output must be encoded string, got %T", recordedOutput.output)) - } - decodedOutput, err = deserialize(&encodedStr) + decodedOutput, err = deserialize(recordedOutput.output) if err != nil { return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("failed to deserialize recorded output: %w", err)) } @@ -1363,22 +1359,14 @@ func (c *dbosContext) Recv(_ DBOSContext, topic string, timeout time.Duration) ( Topic: topic, Timeout: timeout, } - encodedMsg, err := retryWithResult(c, func() (any, error) { + encodedMsg, err := retryWithResult(c, func() (*string, error) { return c.systemDB.recv(c, input) }, withRetrierLogger(c.logger)) if err != nil { return nil, err } - // Deserialize the received message - if encodedMsg != nil { - encodedStr, ok := encodedMsg.(string) - if !ok { - return nil, fmt.Errorf("received message must be encoded string, got %T", encodedMsg) - } - return deserialize(&encodedStr) - } - return nil, nil + return deserialize(encodedMsg) } // Recv receives a message sent to this workflow with type safety. @@ -2062,22 +2050,26 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) if params.loadInput || params.loadOutput { for i := range workflows { if params.loadInput && workflows[i].Input != nil { - encodedStr, ok := workflows[i].Input.(string) - if ok && encodedStr != "" { - workflows[i].Input, err = deserialize(&encodedStr) - if err != nil { - return nil, fmt.Errorf("failed to deserialize workflow input for %s: %w", workflows[i].ID, err) - } + encodedInput, ok := workflows[i].Input.(*string) + if !ok { + return nil, fmt.Errorf("workflow input must be encoded string, got %T", workflows[i].Input) } + decodedInput, err := deserialize(encodedInput) + if err != nil { + return nil, fmt.Errorf("failed to deserialize workflow input for %s: %w", workflows[i].ID, err) + } + workflows[i].Input = decodedInput } if params.loadOutput && workflows[i].Output != nil { - encodedStr, ok := workflows[i].Output.(string) - if ok && encodedStr != "" { - workflows[i].Output, err = deserialize(&encodedStr) - if err != nil { - return nil, fmt.Errorf("failed to deserialize workflow output for %s: %w", workflows[i].ID, err) - } + encodedOutput, ok := workflows[i].Output.(*string) + if !ok { + return nil, fmt.Errorf("workflow output must be encoded string, got %T", workflows[i].Output) + } + decodedOutput, err := deserialize(encodedOutput) + if err != nil { + return nil, fmt.Errorf("failed to deserialize workflow output for %s: %w", workflows[i].ID, err) } + workflows[i].Output = decodedOutput } } } @@ -2163,18 +2155,18 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step return nil, err } - // Deserialize Output fields if they were loaded + // Deserialize outputs if asked to if loadOutput { for i := range steps { - if steps[i].Output != nil { - encodedStr, ok := steps[i].Output.(string) - if ok && encodedStr != "" { - steps[i].Output, err = deserialize(&encodedStr) - if err != nil { - return nil, fmt.Errorf("failed to deserialize step output for step %d: %w", steps[i].StepID, err) - } - } + encodedOutput, ok := steps[i].Output.(*string) + if !ok { + return nil, fmt.Errorf("step output must be encoded string, got %T", steps[i].Output) + } + decodedOutput, err := deserialize(encodedOutput) + if err != nil { + return nil, fmt.Errorf("failed to deserialize step output for step %d: %w", steps[i].StepID, err) } + steps[i].Output = decodedOutput } } From f05d0c92c58ba50dc414a5fd1eba20f48971c848 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 17:07:40 -0700 Subject: [PATCH 08/11] not now --- dbos/serialization.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbos/serialization.go b/dbos/serialization.go index 7ec90bc..014b067 100644 --- a/dbos/serialization.go +++ b/dbos/serialization.go @@ -20,9 +20,6 @@ func serialize(data any) (*string, error) { return &str, nil } - // Register the type with gob to avoid interface{} issues - safeGobRegister(data, nil) - var buf bytes.Buffer enc := gob.NewEncoder(&buf) if err := enc.Encode(data); err != nil { From a3bd6f7f305cd0427a4c3ca91be6a000c3882bc0 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 17:16:27 -0700 Subject: [PATCH 09/11] fix + awaitWorkflowResult returns *string --- dbos/recovery.go | 14 ++++++-------- dbos/serialization.go | 4 ++++ dbos/system_database.go | 16 +++++----------- dbos/workflow.go | 9 +++++---- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/dbos/recovery.go b/dbos/recovery.go index 48277c7..fefd56d 100644 --- a/dbos/recovery.go +++ b/dbos/recovery.go @@ -17,17 +17,15 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow // Deserialize the workflow input var decodedInput any if workflow.Input != nil { - inputStr, ok := workflow.Input.(string) + inputString, ok := workflow.Input.(*string) if !ok { - ctx.logger.Warn("Skipping workflow recovery: input is not an encoded string", "workflow_id", workflow.ID, "name", workflow.Name, "input_type", workflow.Input) + ctx.logger.Warn("Skipping workflow recovery due to invalid input type", "workflow_id", workflow.ID, "name", workflow.Name, "input_type", workflow.Input) continue } - if inputStr != "" { - decodedInput, err = deserialize(&inputStr) - if err != nil { - ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name, "error", err) - continue - } + decodedInput, err = deserialize(inputString) + if err != nil { + ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name, "error", err) + continue } } diff --git a/dbos/serialization.go b/dbos/serialization.go index 014b067..5964468 100644 --- a/dbos/serialization.go +++ b/dbos/serialization.go @@ -36,6 +36,10 @@ func deserialize(data *string) (any, error) { return nil, nil } + if *data == "" { + return "", nil + } + dataBytes, err := base64.StdEncoding.DecodeString(*data) if err != nil { return nil, fmt.Errorf("failed to decode data: %w", err) diff --git a/dbos/system_database.go b/dbos/system_database.go index 9a41c94..aae1380 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -37,7 +37,7 @@ type systemDatabase interface { insertWorkflowStatus(ctx context.Context, input insertWorkflowStatusDBInput) (*insertWorkflowResult, error) listWorkflows(ctx context.Context, input listWorkflowsDBInput) ([]WorkflowStatus, error) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error - awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) + awaitWorkflowResult(ctx context.Context, workflowID string) (*string, error) cancelWorkflow(ctx context.Context, workflowID string) error cancelAllBefore(ctx context.Context, cutoffTime time.Time) error resumeWorkflow(ctx context.Context, workflowID string) error @@ -1138,7 +1138,7 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st return forkedWorkflowID, nil } -func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) { +func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (*string, error) { query := fmt.Sprintf(`SELECT status, output, error FROM %s.workflow_status WHERE workflow_uuid = $1`, pgx.Identifier{s.schema}.Sanitize()) var status WorkflowStatusType for { @@ -1160,20 +1160,14 @@ func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any return nil, fmt.Errorf("failed to query workflow status: %w", err) } - // Return output as encoded string - var output any - if outputString != nil { - output = *outputString - } - switch status { case WorkflowStatusSuccess, WorkflowStatusError: if errorStr == nil || len(*errorStr) == 0 { - return output, nil + return outputString, nil } - return output, errors.New(*errorStr) + return outputString, errors.New(*errorStr) case WorkflowStatusCancelled: - return output, newAwaitedWorkflowCancelledError(workflowID) + return outputString, newAwaitedWorkflowCancelledError(workflowID) default: time.Sleep(_DB_RETRY_INTERVAL) } diff --git a/dbos/workflow.go b/dbos/workflow.go index 2c3f857..91b193b 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -256,12 +256,12 @@ func (h *workflowPollingHandle[R]) GetResult(opts ...GetResultOption) (R, error) // Deserialize the result (but preserve any error from awaitWorkflowResult) var result any if encodedResult != nil { - encodedStr, ok := encodedResult.(string) + encodedStr, ok := encodedResult.(*string) if !ok { return *new(R), newWorkflowUnexpectedResultType(h.workflowID, "string (encoded)", fmt.Sprintf("%T", encodedResult)) } var deserErr error - result, deserErr = deserialize(&encodedStr) + result, deserErr = deserialize(encodedStr) if deserErr != nil { return *new(R), fmt.Errorf("failed to deserialize workflow result: %w", deserErr) } @@ -964,14 +964,15 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID) }, withRetrierLogger(c.logger)) var deserErr error - encodedResultString, ok := encodedResult.(string) + encodedResultString, ok := encodedResult.(*string) if !ok { c.logger.Error("Unexpected result type when awaiting workflow result after ID conflict", "workflow_id", workflowID, "type", fmt.Sprintf("%T", encodedResult)) outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("unexpected result type when awaiting workflow result after ID conflict: expected string, got %T", encodedResult)} close(outcomeChan) return } - result, deserErr = deserialize(&encodedResultString) + fmt.Println("Encoded result string:", *encodedResultString) + result, deserErr = deserialize(encodedResultString) if deserErr != nil { c.logger.Error("Failed to deserialize workflow result after ID conflict", "workflow_id", workflowID, "error", deserErr) outcomeChan <- workflowOutcome[any]{result: nil, err: fmt.Errorf("failed to deserialize workflow result after ID conflict: %w", deserErr)} From 7c8211aa9b2117e379d6209280350aa90ed7d9dc Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 17:35:13 -0700 Subject: [PATCH 10/11] fixes --- dbos/queue.go | 10 ++++------ dbos/serialization.go | 2 +- dbos/system_database.go | 9 ++------- dbos/workflow.go | 4 ++-- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/dbos/queue.go b/dbos/queue.go index 5897de3..ca55999 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -226,12 +226,10 @@ func (qr *queueRunner) run(ctx *dbosContext) { // Deserialize input using the centralized deserialize function var input any - if len(workflow.input) > 0 { - input, err = deserialize(&workflow.input) - if err != nil { - qr.logger.Error("failed to deserialize input for workflow", "workflow_id", workflow.id, "error", err) - continue - } + input, err = deserialize(workflow.input) + if err != nil { + qr.logger.Error("failed to deserialize input for workflow", "workflow_id", workflow.id, "error", err) + continue } _, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id)) diff --git a/dbos/serialization.go b/dbos/serialization.go index 5964468..7747c88 100644 --- a/dbos/serialization.go +++ b/dbos/serialization.go @@ -22,7 +22,7 @@ func serialize(data any) (*string, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) - if err := enc.Encode(data); err != nil { + if err := enc.Encode(&data); err != nil { return nil, fmt.Errorf("failed to encode data: %w", err) } inputBytes := buf.Bytes() diff --git a/dbos/system_database.go b/dbos/system_database.go index aae1380..88a962e 100644 --- a/dbos/system_database.go +++ b/dbos/system_database.go @@ -2138,7 +2138,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error) type dequeuedWorkflow struct { id string name string - input string + input *string } type dequeueWorkflowsInput struct { @@ -2340,21 +2340,16 @@ func (s *sysDB) dequeueWorkflows(ctx context.Context, input dequeueWorkflowsInpu WHERE workflow_uuid = $5 RETURNING name, inputs`, pgx.Identifier{s.schema}.Sanitize()) - var inputString *string err := tx.QueryRow(ctx, updateQuery, WorkflowStatusPending, input.applicationVersion, input.executorID, time.Now().UnixMilli(), - id).Scan(&retWorkflow.name, &inputString) + id).Scan(&retWorkflow.name, &retWorkflow.input) if err != nil { return nil, fmt.Errorf("failed to update workflow %s during dequeue: %w", id, err) } - if inputString != nil && len(*inputString) > 0 { - retWorkflow.input = *inputString - } - retWorkflows = append(retWorkflows, retWorkflow) } diff --git a/dbos/workflow.go b/dbos/workflow.go index 91b193b..876f4f5 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1463,11 +1463,11 @@ func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID, key string, time // Deserialize the event value if encodedValue != nil { - encodedStr, ok := encodedValue.(string) + encodedStr, ok := encodedValue.(*string) if !ok { return nil, fmt.Errorf("event value must be encoded string, got %T", encodedValue) } - return deserialize(&encodedStr) + return deserialize(encodedStr) } return nil, nil } From e87d0e6863a2a7912dd0a253e241b001cb0cd603 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 29 Oct 2025 17:56:32 -0700 Subject: [PATCH 11/11] nit --- dbos/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbos/queue.go b/dbos/queue.go index ca55999..af1d104 100644 --- a/dbos/queue.go +++ b/dbos/queue.go @@ -224,7 +224,7 @@ func (qr *queueRunner) run(ctx *dbosContext) { continue } - // Deserialize input using the centralized deserialize function + // Deserialize input var input any input, err = deserialize(workflow.input) if err != nil {