Skip to content

Conversation

@sleipnir
Copy link
Collaborator

@sleipnir sleipnir commented Oct 21, 2025

🎯 Context

This PR introduces a new error handling and side-effect model for the GRPC.Stream module, covering both unary and streaming flows.
The implementation improves safety, consistency, and expressiveness across Flow-based pipelines used in gRPC message processing, ensuring that exceptions, errors, and inconsistent return values are handled uniformly and predictably.


🚀 New Features

🧩 1. effect/2

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

GRPC.Stream.from(....)
|> GRPC.Stream.effect(fn x -> Logger.debug("Processing #{x}") end)
|> GRPC.Stream.run()
  • Ensures exceptions raised inside the effect function do not break the stream.
  • Uses the internal safe_invoke/2 mechanism to capture exceptions and thrown values.
  • Behaves like a safe tap/2, preserving all original stream data.

⚙️ 2. map_error/2

Adds a declarative mechanism for mapping or transforming errors ({:error, reason}) within a stream into structured GRPC.RPCError values.

GRPC.Stream.from([1, 2])
|> GRPC.Stream.map(fn
  2 -> raise "boom"
  x -> x
end)
|> GRPC.Stream.map_error(fn
  {:error, reason} ->
    GRPC.RPCError.exception(message: "Validation or runtime error: #{inspect(reason)}")
  other ->
    other
end)

Behavior:

  • {:error, reason} → transformed into %GRPC.RPCError{}
  • {:ok, value} → unwrapped to value
  • exceptions from previous operators (e.g. map/2) → can be transformed into domain-specific GRPC.RPCError
  • normal values are propagated unchanged

This enables localized error recovery and translation, useful for both input validation and unexpected runtime exceptions.


🔄 3. Unified Error Matching and Propagation

All stream operators now use the internal safe_invoke/2 wrapper, which standardizes how functions inside the pipeline behave.

Function result Propagated value Description
{:ok, value} value Normalized success
{:error, reason} {:error, reason} Propagated as-is
raise (exception) {:error, exception} Caught and wrapped
throw value {:error, {:throw, value}} Uniformly captured
raw value same Passed through unchanged

This makes all pipelines exception-safe — any operator can raise or throw, but the stream will continue gracefully with well-defined {:error, reason} items.


💡 4. Integration with run/1 and run_with/3

Both stream finalizers (run/1, run_with/3) have been updated to:

  • Handle {:error, reason} tuples and convert them to %GRPC.RPCError{}.
  • Support both unary and streaming flows.
  • Ensure that errors in one element do not interrupt subsequent elements in the stream.
  • Allow dry_run mode to test flow execution without emitting gRPC responses.

✅ 5. Test Coverage

New and extended tests added in GRPC.StreamTest include:

  • effect/2: ensures side effects are applied safely and do not affect data flow, even when exceptions occur inside the callback.
  • map_error/2: verifies correct error mapping and recovery.
  • Validation of error propagation across all operators (map, flat_map, filter, ask, etc.).
  • Resilience against process failures (:process_not_alive, :timeout) in ask/3.
  • Full pipeline integration tests combining multiple operators and error recovery logic.

🧠 Impact

This refactor:

  • Unifies and simplifies error handling across the entire streaming subsystem.
  • Makes pipelines more robust — no unexpected crashes due to raised errors.
  • Improves observability: all errors now flow through %GRPC.RPCError{}.
  • Adds new extensibility points for metrics, tracing, and recovery.

🔮 Next Steps

  • Integrate tracing hooks to improve runtime observability.
  • Update the docs.

@sleipnir sleipnir requested a review from polvalente October 21, 2025 03:55
@yordis
Copy link
Contributor

yordis commented Oct 21, 2025

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

Other ecosystem call it https://doc.rust-lang.org/std/result/enum.Result.html#method.inspect_err

Eitherway, fine by me

@sleipnir
Copy link
Collaborator Author

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

Other ecosystem call it https://doc.rust-lang.org/std/result/enum.Result.html#method.inspect_err

Eitherway, fine by me

In this case, side effects aren't necessarily linked to an error; side effects in this context are any operation that shouldn't affect the flow. Think of a call to an external system (e.g., sending an email) that shouldn't affect the processing of the main flow, and is therefore a side effect. Then inspect_err would be a misnomer in this context. But I added map_error that can serve this purpose.

sleipnir and others added 3 commits October 22, 2025 00:20
Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
@sleipnir sleipnir requested a review from polvalente October 22, 2025 03:56
@sleipnir sleipnir marked this pull request as ready for review October 22, 2025 19:00
Copy link
Contributor

@yordis yordis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 💜

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants