Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
781a7cc
chore: release new version
May 10, 2025
ce286dc
Merge remote-tracking branch 'upstream/master'
May 14, 2025
dae7deb
merge
May 28, 2025
9fe3c55
Merge remote-tracking branch 'upstream/master'
Jul 15, 2025
2a76e93
Merge remote-tracking branch 'upstream/master'
Jul 31, 2025
3af0153
Merge remote-tracking branch 'upstream/master'
sleipnir Sep 9, 2025
4feee64
Merge remote-tracking branch 'upstream/master'
sleipnir Sep 11, 2025
bae8b92
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 13, 2025
0cfb96d
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 14, 2025
db695ec
bump 0.10.2 -> 0.11.0
sleipnir Oct 14, 2025
4465815
merge with master
sleipnir Oct 16, 2025
93517c8
feat: added new function to handle side-effects
sleipnir Oct 20, 2025
b03b837
chore: added doc, remove comments
sleipnir Oct 20, 2025
b7af0d3
feat: added error handler unary and stream pipelines
sleipnir Oct 21, 2025
5ed5a8f
test: added many more tests
sleipnir Oct 21, 2025
eb6fdd4
Update lib/grpc/stream.ex
sleipnir Oct 22, 2025
0933617
Update lib/grpc/stream.ex
sleipnir Oct 22, 2025
c73eba3
fix: correct return type in doc
sleipnir Oct 22, 2025
9bf8fd2
Update lib/grpc/stream/operators.ex
sleipnir Oct 22, 2025
c1daaea
Merge branch 'feat/error-handler' of https://github.com/sleipnir/grpc…
sleipnir Oct 22, 2025
acaf147
chore: updated after review
sleipnir Oct 22, 2025
f6bffae
docs: adds a better explanation of the different types of input
sleipnir Oct 22, 2025
140e1a9
mix format
sleipnir Oct 22, 2025
4ba7367
Merge remote-tracking branch 'upstream/master'
sleipnir Oct 22, 2025
e9d27fd
chore: merge and new api integration tests
sleipnir Oct 22, 2025
34c8dd4
test: reintroduces tests that were removed by mistake
sleipnir Oct 22, 2025
28ed00c
docs: introduces documentation for error handling and side effects
sleipnir Oct 22, 2025
015fadf
Merge branch 'master' into feat/error-handler
sleipnir Oct 22, 2025
a5c72f3
Merge branch 'master' into feat/error-handler
sleipnir Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions examples/helloworld_streams/lib/helloworld_streams/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ defmodule HelloworldStreams.Server do
def say_unary_hello(request, _materializer) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
|> GRPCStream.map(fn %HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
|> GRPCStream.map(fn
%HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}

{:error, reason} ->
GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")
end)
|> GRPCStream.run()
end
Expand Down
145 changes: 134 additions & 11 deletions lib/grpc/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,19 @@ defmodule GRPC.Stream do
# We have to call `Enum.to_list` because we want to actually run and materialize the full stream.
# List.flatten and List.first are used so that we can obtain the first result of the materialized list.
flow
|> Flow.map(fn
{:ok, msg} ->
msg

{:error, %GRPC.RPCError{} = reason} ->
reason

{:error, reason} ->
GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")

msg ->
msg
end)
|> Enum.to_list()
|> List.flatten()
|> List.first()
Expand Down Expand Up @@ -190,19 +203,125 @@ defmodule GRPC.Stream do
raise ArgumentError, "run_with/3 is not supported for unary streams"
end

dry_run? = Keyword.get(opts, :dry_run, false)

flow
|> Flow.map(fn msg ->
if not dry_run? do
send_response(from, msg)
end
|> Flow.map(fn
{:ok, msg} ->
send_response(from, msg, opts)

flow

{:error, %GRPC.RPCError{} = reason} ->
send_response(from, reason, opts)
flow

flow
{:error, reason} ->
msg = GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")
send_response(from, msg, opts)
flow

msg ->
send_response(from, msg, opts)
flow
end)
|> Flow.run()
end

@doc """
Intercepts and transforms **error tuples** or **unexpected exceptions** that occur
within a gRPC stream pipeline.

`map_error/3` allows graceful handling or recovery from errors produced by previous
operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data.

The provided `handler/1` function receives the error reason (or the exception struct)
and can either:

* Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error.
* Return any other value to recover from the failure and continue the pipeline.

This makes it suitable for both **input validation** and **capturing unexpected runtime errors**
in stream transformations.

## Parameters

- `stream` — The input stream or `Flow` pipeline.
- `func` — A function that takes an error reason or exception and returns either a new value or an error tuple.

## Returns

- A new stream where all error tuples and raised exceptions are processed by `func/1`.

## Examples

iex> GRPC.Stream.from([1, 2])
...> |> GRPC.Stream.map(fn
...> 2 -> raise "boom"
...> x -> x
...> end)
...> |> GRPC.Stream.map_error(fn
...> {:error, _reason} ->
...> GRPC.RPCError.exception(message: "Validation or runtime error")
...> msg -> msg
...> end)

In this example:

* The call to `GRPC.Stream.map/2` raises an exception for value `2`.
* `map_error/3` catches the error and wraps it in a `GRPC.RPCError` struct with a custom message.
* The pipeline continues execution, transforming errors into structured responses.

## Notes

- `map_error/3` is **lazy** and only executes when the stream is materialized
(via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).

- Use this operator to implement **robust error recovery**, **input validation**, or
to normalize exceptions from downstream Flow stages into well-defined gRPC errors.
"""
defdelegate map_error(stream, func), to: Operators

@doc """
Applies a **side-effecting function** to each element of the stream **without altering** its values.

The `effect/2` function is useful for performing **imperative or external actions**
(such as logging, sending messages, collecting metrics, or debugging)
while preserving the original stream data.

It behaves like `Enum.each/2`, but returns the stream itself so it can continue in the pipeline.

## Examples

```elixir
iex> parent = self()
iex> stream =
...> GRPC.Stream.from([1, 2, 3])
...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
...> |> GRPC.Stream.to_flow()
...> |> Enum.to_list()
iex> assert_receive {:seen, 2}
iex> assert_receive {:seen, 4}
iex> assert_receive {:seen, 6}
iex> stream
[1, 2, 3]
```
In this example, the effect/2 function sends a message to the current process
for each element in the stream, but the resulting stream values remain unchanged.

## Parameters

- `stream` — The input `GRPC.Stream`.
- `effect_fun` — A function that receives each item and performs a side effect
(e.g. IO.inspect/1, Logger.info/1, send/2, etc.).

### Notes

- This function is **lazy** — the `effect_fun` will only run once the stream is materialized
(e.g. via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).
- The use of `effect/2` ensures that the original item is returned unchanged,
enabling seamless continuation of the pipeline.
"""
defdelegate effect(stream, effect_fun), to: Operators

@doc """
Sends a request to an external process and awaits a response.

Expand All @@ -220,9 +339,9 @@ defmodule GRPC.Stream do
## Returns

- Updated stream if successful.
- `{:error, item, reason}` if the request fails or times out.
- `{:error, reason}` if the request fails or times out.
"""
@spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()}
@spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, reason()}
defdelegate ask(stream, target, timeout \\ 5000), to: Operators

@doc """
Expand Down Expand Up @@ -369,7 +488,11 @@ defmodule GRPC.Stream do
%__MODULE__{flow: flow, options: opts}
end

defp send_response(from, msg) do
GRPC.Server.send_reply(from, msg)
defp send_response(from, msg, opts) do
dry_run? = Keyword.get(opts, :dry_run, false)

if not dry_run? do
GRPC.Server.send_reply(from, msg)
end
end
end
97 changes: 88 additions & 9 deletions lib/grpc/stream/operators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ defmodule GRPC.Stream.Operators do
@type reason :: any()

@spec ask(GRPCStream.t(), pid | atom, non_neg_integer) ::
GRPCStream.t() | {:error, any(), :timeout | :not_alive}
GRPCStream.t() | {:error, :timeout | :process_not_alive}
def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
# mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
mapper = fn item -> safe_invoke(&do_ask(&1, target, timeout, raise_on_error: false), item) end
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
end

Expand All @@ -33,7 +34,7 @@ defmodule GRPC.Stream.Operators do
raise "Target #{inspect(target)} is not alive. Cannot send request to it."

is_nil(resolved_target) ->
{:error, item, :not_alive}
{:error, :process_not_alive}

true ->
send(resolved_target, {:request, item, self()})
Expand All @@ -45,25 +46,46 @@ defmodule GRPC.Stream.Operators do
if raise? do
raise "Timeout waiting for response from #{inspect(target)}"
else
{:error, item, :timeout}
{:error, :timeout}
end
end
end
end

@spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t()
def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do
%GRPCStream{
stream
| flow:
Flow.map(flow, fn flow_item ->
tap(flow_item, fn item -> safe_invoke(effect_fun, item) end)
end)
}
end

@spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def filter(%GRPCStream{flow: flow} = stream, filter) do
%GRPCStream{stream | flow: Flow.filter(flow, filter)}
flow_wrapper = Flow.filter(flow, fn item -> safe_invoke(filter, item) end)
%GRPCStream{stream | flow: flow_wrapper}
end

@spec flat_map(GRPCStream.t(), (term -> Enumerable.GRPCStream.t())) :: GRPCStream.t()
def flat_map(%GRPCStream{flow: flow} = stream, flat_mapper) do
%GRPCStream{stream | flow: Flow.flat_map(flow, flat_mapper)}
flow_wrapper =
Flow.flat_map(flow, fn item ->
case safe_invoke(flat_mapper, item) do
{:error, reason} -> [{:error, reason}]
res -> res
end
end)

%GRPCStream{stream | flow: flow_wrapper}
end

@spec map(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def map(%GRPCStream{flow: flow} = stream, mapper) do
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
flow_wrapper = Flow.map(flow, fn item -> safe_invoke(mapper, item) end)
%GRPCStream{stream | flow: flow_wrapper}
end

@spec map_with_context(GRPCStream.t(), (map(), term -> term)) :: GRPCStream.t()
Expand All @@ -73,7 +95,41 @@ defmodule GRPC.Stream.Operators do
mapper.(meta, item)
end

%GRPCStream{stream | flow: Flow.map(flow, wrapper)}
flow_wrapper = Flow.map(flow, fn item -> safe_invoke(wrapper, item) end)

%GRPCStream{stream | flow: flow_wrapper}
end

@spec map_error(GRPCStream.t(), (reason -> term)) :: GRPCStream.t()
def map_error(%GRPCStream{flow: flow} = stream, func) when is_function(func, 1) do
mapper =
Flow.map(flow, fn
{:error, _reason} = item ->
res = safe_invoke(func, item)

case res do
{:error, %GRPC.RPCError{} = new_reason} ->
{:error, new_reason}

{:error, new_reason} ->
msg = "[Error] #{inspect(new_reason)}"
{:error, GRPC.RPCError.exception(message: msg)}

{:ok, other} ->
other

other ->
other
end

{:ok, other} ->
other

other ->
other
end)

%GRPCStream{stream | flow: mapper}
end

@spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t()
Expand All @@ -88,7 +144,8 @@ defmodule GRPC.Stream.Operators do

@spec reject(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def reject(%GRPCStream{flow: flow} = stream, filter) do
%GRPCStream{stream | flow: Flow.reject(flow, filter)}
flow_wrapper = Flow.reject(flow, fn item -> safe_invoke(filter, item) end)
%GRPCStream{stream | flow: flow_wrapper}
end

@spec uniq(GRPCStream.t()) :: GRPCStream.t()
Expand All @@ -100,4 +157,26 @@ defmodule GRPC.Stream.Operators do
def uniq_by(%GRPCStream{flow: flow} = stream, fun) do
%GRPCStream{stream | flow: Flow.uniq_by(flow, fun)}
end

# Normalizes and catches exceptions/throws.
# Returns:
# value -> successful value
# {:error, reason} -> failure
defp safe_invoke(fun, arg) do
try do
res = fun.(arg)

case res do
{:ok, v} -> v
{:error, reason} -> {:error, reason}
other -> other
end
rescue
e ->
{:error, e}
catch
kind, reason ->
{:error, {kind, reason}}
end
end
end
Loading
Loading