diff --git a/README.md b/README.md
index 6f266dc7..67ce8648 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,10 @@
   - [Unary RPC using Stream API](#unary-rpc-using-stream-api)
   - [Server-Side Streaming](#server-side-streaming)
   - [Bidirectional Streaming](#bidirectional-streaming)
+  - [Effects and Error Handling](#effects-and-error-handling)
+    - [Side Effects](#side-effects-with-effect2)
+    - [Recovery from errors](#recovery-from-errors)
+    - [Unified Error Matching and Propagation](#unified-error-matching-and-propagation)
 - [Application Startup](#application-startup)
 - [Client Usage](#client-usage)
   - [Basic Connection and RPC](#basic-connection-and-rpc)
@@ -101,8 +105,9 @@ defmodule HelloworldStreams.Server do
   alias Helloworld.HelloReply
 
   @spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
-  def say_unary_hello(request, _materializer) do
-    GRPC.Stream.unary(request)
+  def say_unary_hello(request, materializer) do
+    request
+    |> GRPC.Stream.unary(materializer: materializer)
     |> GRPC.Stream.map(fn %HelloReply{} = reply ->
       %HelloReply{message: "[Reply] #{reply.message}"}
     end)
@@ -144,28 +149,104 @@ def say_bid_stream_hello(request, materializer) do
   |> GRPC.Stream.run_with(materializer)
 end
 ```
-The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:
-
-| Function                         | Description  | Parameters / Options  |
-|:---------------------------------|:-------------|:----------------------|
-| **`from(input, opts \\\\ [])`**  | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**
• `input` — stream, list, or gRPC struct.
**Options:**
• `:join_with` — PID or name of an external `GenStage` producer.
• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).
• `:propagate_context` — if `true`, propagates the materializer context.
• `:materializer` — the current `%GRPC.Server.Stream{}`.
• Other options supported by `Flow`. |
-| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**
• `input` — single gRPC message.
**Options:** same as `from/2`. |
-| **`to_flow(stream)`**            | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**
• `stream` — `%GRPC.Stream{}` struct. |
-| **`run(stream)`**                | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**
• `stream` — `%GRPC.Stream{}` with `unary: true` option. |
-| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `materializer` — `%GRPC.Server.Stream{}`.
**Options:**
• `:dry_run` — if `true`, responses are not sent. |
-| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `target` — PID or atom.
• `timeout` — in milliseconds. |
-| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
-| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — function `(item -> boolean)`. |
-| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> Enumerable.t())`. |
-| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)`. |
-| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(context, item -> term)`. |
-| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `opts` — partitioning options (`Flow.partition/2`). |
-| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `acc_fun` — initializer function `() -> acc`.
• `reducer_fun` — `(item, acc -> acc)`. |
-| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**
• `stream` — `%GRPC.Stream{}`. |
-| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)` for uniqueness determination. |
-| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**
• `stream` — `%GRPC.Server.Stream{}`.
**Returns:** `map` containing decoded headers. |
-
-For a complete list of available operators see [here](lib/grpc/stream.ex).
+The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex).
+
+---
+
+### Effects and Error Handling
+
+#### Side Effects
+
+The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications.
+
+Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values — it preserves the original stream while executing the provided callback safely for each emitted element.
+
+```elixir
+iex> parent = self()
+iex> stream =
+...>   GRPC.Stream.from([1, 2, 3])
+...>   |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end)
+...>   |> GRPC.Stream.to_flow()
+...>   |> Enum.to_list()
+iex> assert_receive {:seen, 2}
+iex> assert_receive {:seen, 4}
+iex> assert_receive {:seen, 6}
+iex> stream
+[1, 2, 3]
+```
+
+Key characteristics:
+
+* The callback function (`effect_fun`) is invoked for each item emitted downstream.
+* The result of the callback is ignored, ensuring that the stream’s structure and values remain unchanged.
+* Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1.
+* Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow.
+
+This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream.
+
+---
+
+#### Recovery from errors
+
+The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline.
+
+It provides a unified mechanism for handling:
+
+* Expected errors, such as validation or domain failures (`{:error, reason}`)
+* Unexpected runtime errors, including raised or thrown exceptions inside other operators.
+
+```elixir
+iex> GRPC.Stream.from([1, 2])
+...> |> GRPC.Stream.map(fn
+...>   2 -> raise "boom"
+...>   x -> x
+...> end)
+...> |> GRPC.Stream.map_error(fn
+...>   {:error, {:exception, _reason}} ->
+...>     {:error, GRPC.RPCError.exception(message: "Booomm")}
+...> end)
+``` 
+
+In this example:
+
+* The function inside `map/2` raises an exception for the value `2`.
+* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response.
+* The stream continues processing without being interrupted.
+
+This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines.
+
+---
+
+#### Unified Error Matching and Propagation
+
+All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline.
+
+This ensures that user-defined functions within the stream — whether pure transformations, side effects, or external calls — always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors.
+
+```elixir
+def say_unary_hello(request, _materializer) do
+  GRPCStream.unary(request)
+  |> GRPCStream.ask(Transformer)
+  |> GRPCStream.map(fn
+    %HelloReply{} = reply ->
+      %HelloReply{message: "[Reply] #{reply.message}"}
+
+    {:error, reason} ->
+      {:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")}
+    
+    error ->
+      Logger.error("Unknown error")
+      error
+  end)
+  |> GRPCStream.run()
+end
+```
+
+By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution.
+
+This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors.
+
+>_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context.
 
 ---
 
@@ -175,7 +256,7 @@ Add the server supervisor to your application's supervision tree:
 
 ```elixir
 defmodule Helloworld.Application do
-  @ false
+  @moduledoc false
   use Application
 
   @impl true
diff --git a/examples/helloworld_streams/lib/helloworld_streams/server.ex b/examples/helloworld_streams/lib/helloworld_streams/server.ex
index 76771939..a9976dba 100644
--- a/examples/helloworld_streams/lib/helloworld_streams/server.ex
+++ b/examples/helloworld_streams/lib/helloworld_streams/server.ex
@@ -14,8 +14,12 @@ defmodule HelloworldStreams.Server do
   def say_unary_hello(request, _materializer) do
     GRPCStream.unary(request)
     |> GRPCStream.ask(Transformer)
-    |> GRPCStream.map(fn %HelloReply{} = reply ->
-      %HelloReply{message: "[Reply] #{reply.message}"}
+    |> GRPCStream.map(fn
+      %HelloReply{} = reply ->
+        %HelloReply{message: "[Reply] #{reply.message}"}
+
+      {:error, reason} ->
+        GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")
     end)
     |> GRPCStream.run()
   end
diff --git a/lib/grpc/client/connection.ex b/lib/grpc/client/connection.ex
index 0ce01076..863f385b 100644
--- a/lib/grpc/client/connection.ex
+++ b/lib/grpc/client/connection.ex
@@ -1,12 +1,12 @@
 defmodule GRPC.Client.Connection do
   @moduledoc """
-  Connection manager for gRPC client channels, with optional **load balancing**
-  and **name resolution** support.
+  Connection manager for gRPC client channels, with optional load balancing
+  and name resolution support.
 
   A `Conn` process manages one or more underlying gRPC connections
-  (`GRPC.Channel` structs) and exposes a **virtual channel** to be used by
+  (`GRPC.Channel` structs) and exposes a virtual channel to be used by
   client stubs. The orchestration process runs as a `GenServer` registered
-  globally (via `:global`), so only one orchestrator exists **per connection**
+  globally (via `:global`), so only one orchestrator exists per connection
   in a BEAM node.
 
   ## Overview
diff --git a/lib/grpc/client/resolver.ex b/lib/grpc/client/resolver.ex
index de605c46..1a3b7d33 100644
--- a/lib/grpc/client/resolver.ex
+++ b/lib/grpc/client/resolver.ex
@@ -2,7 +2,7 @@ defmodule GRPC.Client.Resolver do
   @moduledoc """
   Behaviour for gRPC client resolvers.
 
-  A gRPC resolver is responsible for translating a **target string** into
+  A gRPC resolver is responsible for translating a target string into
   a list of connection endpoints (addresses) and an optional `ServiceConfig`.
 
   gRPC supports multiple naming schemes, allowing clients to connect
diff --git a/lib/grpc/client/resolver/ipv6.ex b/lib/grpc/client/resolver/ipv6.ex
index 14a5d771..17cba904 100644
--- a/lib/grpc/client/resolver/ipv6.ex
+++ b/lib/grpc/client/resolver/ipv6.ex
@@ -9,7 +9,7 @@ defmodule GRPC.Client.Resolver.IPv6 do
 
       ipv6:[addr][:port][,[addr][:port],...]
 
-  - IPv6 addresses **must** be enclosed in square brackets (`[...]`).
+  - IPv6 addresses must be enclosed in square brackets (`[...]`).
   - The port is optional; if not provided, the default port is `443`.
   - Multiple addresses can be comma-separated.
   - `service_config` is always `nil` as IPv6 literals do not support DNS TXT or xDS.
diff --git a/lib/grpc/client/resolver/unix.ex b/lib/grpc/client/resolver/unix.ex
index 358e67a9..718e6a84 100644
--- a/lib/grpc/client/resolver/unix.ex
+++ b/lib/grpc/client/resolver/unix.ex
@@ -10,7 +10,7 @@ defmodule GRPC.Client.Resolver.Unix do
 
       unix:///absolute/path/to/socket
 
-  - The scheme **must** be `unix`.
+  - The scheme must be `unix`.
   - The path must be absolute (`/var/run/my.sock`).
   - The port is not used in Unix sockets; `:port` will be `nil`.
   - The socket type is indicated via `:socket => :unix`.
diff --git a/lib/grpc/client/service_config.ex b/lib/grpc/client/service_config.ex
index d81301af..987142eb 100644
--- a/lib/grpc/client/service_config.ex
+++ b/lib/grpc/client/service_config.ex
@@ -9,12 +9,12 @@ defmodule GRPC.Client.ServiceConfig do
 
   According to the gRPC specification ([service_config.md](https://github.com/grpc/grpc/blob/master/doc/service_config.md)):
 
-  - **loadBalancingConfig**: a list of load balancing policies.  
+  - loadBalancingConfig: a list of load balancing policies.
     The client should pick the first policy it supports. Common values are:
       - `"pick_first"`: always pick the first server.
       - `"round_robin"`: distribute calls across servers in round-robin.
 
-  - **methodConfig**: a list of configurations applied to specific methods or services.  
+  - methodConfig: a list of configurations applied to specific methods or services.
     Each entry can include:
       - `"name"`: a list of `{ "service": "", "method": "" }`
         or `{ "service": "" }` to match all methods in the service.
diff --git a/lib/grpc/protoc/cli.ex b/lib/grpc/protoc/cli.ex
index c60afae5..0a468ac4 100644
--- a/lib/grpc/protoc/cli.ex
+++ b/lib/grpc/protoc/cli.ex
@@ -2,7 +2,7 @@ defmodule GRPC.Protoc.CLI do
   @moduledoc """
   `protoc` plugin for generating Elixir code.
 
-  `protoc-gen-elixir` (this name is important) **must** be in `$PATH`. You are not supposed
+  `protoc-gen-elixir` (this name is important) must be in `$PATH`. You are not supposed
   to call it directly, but only through `protoc`.
 
   ## Examples
diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex
index 9099dd54..faeeedcb 100644
--- a/lib/grpc/stream.ex
+++ b/lib/grpc/stream.ex
@@ -158,7 +158,7 @@ defmodule GRPC.Stream do
             "GRPC.Stream.run/1 requires a materializer to be set in the GRPC.Stream"
     end
 
-    send_response(materializer, Enum.at(flow, 0))
+    send_response(materializer, Enum.at(flow, 0), opts)
 
     :noreply
   end
@@ -193,19 +193,71 @@ defmodule GRPC.Stream do
       raise ArgumentError, "run_with/3 is not supported for unary streams"
     end
 
-    dry_run? = Keyword.get(opts, :dry_run, false)
-
     flow
-    |> Flow.map(fn msg ->
-      if not dry_run? do
-        send_response(from, msg)
-      end
-
-      flow
+    |> Flow.map(fn
+      {:ok, msg} ->
+        send_response(from, msg, opts)
+        flow
+
+      {:error, %GRPC.RPCError{} = reason} ->
+        send_response(from, reason, opts)
+        flow
+
+      {:error, reason} ->
+        msg = GRPC.RPCError.exception(message: "#{inspect(reason)}")
+        send_response(from, msg, opts)
+        flow
+
+      msg ->
+        send_response(from, msg, opts)
+        flow
     end)
     |> Flow.run()
   end
 
+  @doc """
+  Applies a side-effect function to each element of the stream without altering its values.
+
+  The `effect/2` function is useful for performing imperative or external actions
+  (such as logging, sending messages, collecting metrics, or debugging)
+  while preserving the original stream data.
+
+  It behaves like `Enum.each/2`, but returns the stream itself so it can continue in the pipeline.
+
+  ## Examples
+
+  ```elixir
+  iex> parent = self()
+  iex> stream =
+  ...>   GRPC.Stream.from([1, 2, 3])
+  ...>   |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
+  ...>   |> GRPC.Stream.to_flow()
+  ...>   |> Enum.to_list()
+  iex> assert_receive {:seen, 2}
+  iex> assert_receive {:seen, 4}
+  iex> assert_receive {:seen, 6}
+  iex> stream
+  [1, 2, 3]
+  ```
+  In this example, the effect/2 function sends a message to the current process
+  for each element in the stream, but the resulting stream values remain unchanged.
+
+  ## Parameters
+
+  - `stream` — The input `GRPC.Stream`.
+  - `effect_fun` — A function that receives each item and performs a side effect
+  (e.g. IO.inspect/1, Logger.info/1, send/2, etc.).
+
+  ### Notes
+
+  - This function is lazy — the `effect_fun` will only run once the stream is materialized
+  (e.g. via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).
+  - The use of `effect/2` ensures that the original item is returned unchanged,
+  enabling seamless continuation of the pipeline.
+  """
+  @spec effect(t(), (term -> any)) :: t()
+  defdelegate effect(stream, effect_fun), to: Operators
+
   @doc """
   Sends a request to an external process and awaits a response.
 
@@ -220,12 +272,8 @@ defmodule GRPC.Stream do
     - `target`: Target process PID or atom name.
     - `timeout`: Timeout in milliseconds (defaults to `5000`).
 
-  ## Returns
-
-    - Updated stream if successful.
-    - `{:error, item, reason}` if the request fails or times out.
   """
-  @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()}
+  @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, :timeout | :process_not_alive}
   defdelegate ask(stream, target, timeout \\ 5000), to: Operators
 
   @doc """
@@ -261,6 +309,64 @@ defmodule GRPC.Stream do
   @spec map(t(), (term -> term)) :: t()
   defdelegate map(stream, mapper), to: Operators
 
+  @doc """
+  Intercepts and transforms error tuples or unexpected exceptions that occur
+  within a gRPC stream pipeline.
+
+  `map_error/3` allows graceful handling or recovery from errors produced by previous
+  operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data.
+
+  The provided `handler/1` function receives the error reason (or the exception struct) like:
+
+      {:error, reason} -> failure
+      {:error, {:exception, exception}} -> failure due to exception
+      {:error, {kind, reason}} -> failure due to throw or exit
+
+  And can either:
+
+    * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error.
+    * Return any other value to recover from the failure and continue the pipeline.
+
+  This makes it suitable for both input validation and capturing unexpected runtime errors
+  in stream transformations.
+
+  ## Parameters
+
+    - `stream` — The input stream or `Flow` pipeline.
+    - `func` — A function that takes an error reason or exception and returns either a new value or an error tuple.
+
+  ## Returns
+
+    - A new stream where all error tuples and raised exceptions are processed by `func/1`.
+
+  ## Examples
+
+      iex> GRPC.Stream.from([1, 2])
+      ...> |> GRPC.Stream.map(fn
+      ...>   2 -> raise "boom"
+      ...>   x -> x
+      ...> end)
+      ...> |> GRPC.Stream.map_error(fn
+      ...>   {:error, {:exception, _reason}} ->
+      ...>     {:error, GRPC.RPCError.exception(message: "Validation or runtime error")}
+      ...> end)
+
+  In this example:
+
+  * The call to `GRPC.Stream.map/2` raises an exception for value `2`.
+  * `map_error/3` catches the error and wraps it in a `GRPC.RPCError` struct with a custom message.
+  * The pipeline continues execution, transforming errors into structured responses.
+
+  ## Notes
+
+  - `map_error/3` is lazy and only executes when the stream is materialized
+    (via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).
+
+  - Use this operator to implement robust error recovery, input validation, or
+    to normalize exceptions from downstream Flow stages into well-defined gRPC errors.
+  """
+  defdelegate map_error(stream, func), to: Operators
+
   @doc """
   Applies a transformation function to each stream item, passing the context as an additional argument.
   This is useful for operations that require access to the stream's headers.
@@ -386,7 +492,11 @@ defmodule GRPC.Stream do
     %__MODULE__{flow: flow, options: opts}
   end
 
-  defp send_response(from, msg) do
-    GRPC.Server.send_reply(from, msg)
+  defp send_response(from, msg, opts) do
+    dry_run? = Keyword.get(opts, :dry_run, false)
+
+    if not dry_run? do
+      GRPC.Server.send_reply(from, msg)
+    end
   end
 end
diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex
index 1f3e15bb..3404abbf 100644
--- a/lib/grpc/stream/operators.ex
+++ b/lib/grpc/stream/operators.ex
@@ -9,9 +9,9 @@ defmodule GRPC.Stream.Operators do
   @type reason :: any()
 
   @spec ask(GRPCStream.t(), pid | atom, non_neg_integer) ::
-          GRPCStream.t() | {:error, any(), :timeout | :not_alive}
+          GRPCStream.t() | {:error, :timeout | :process_not_alive}
   def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
-    mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
+    mapper = fn item -> safe_invoke(&do_ask(&1, target, timeout, raise_on_error: false), item) end
     %GRPCStream{stream | flow: Flow.map(flow, mapper)}
   end
 
@@ -33,7 +33,7 @@ defmodule GRPC.Stream.Operators do
         raise "Target #{inspect(target)} is not alive. Cannot send request to it."
 
       is_nil(resolved_target) ->
-        {:error, item, :not_alive}
+        {:error, :process_not_alive}
 
       true ->
         send(resolved_target, {:request, item, self()})
@@ -45,25 +45,45 @@ defmodule GRPC.Stream.Operators do
             if raise? do
               raise "Timeout waiting for response from #{inspect(target)}"
             else
-              {:error, item, :timeout}
+              {:error, :timeout}
             end
         end
     end
   end
 
+  @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t()
+  def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do
+    flow =
+      Flow.map(flow, fn flow_item ->
+        tap(flow_item, fn item -> safe_invoke(effect_fun, item) end)
+      end)
+
+    %GRPCStream{stream | flow: flow}
+  end
+
   @spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
   def filter(%GRPCStream{flow: flow} = stream, filter) do
-    %GRPCStream{stream | flow: Flow.filter(flow, filter)}
+    flow_wrapper = Flow.filter(flow, fn item -> safe_invoke(filter, item) end)
+    %GRPCStream{stream | flow: flow_wrapper}
   end
 
   @spec flat_map(GRPCStream.t(), (term -> Enumerable.GRPCStream.t())) :: GRPCStream.t()
   def flat_map(%GRPCStream{flow: flow} = stream, flat_mapper) do
-    %GRPCStream{stream | flow: Flow.flat_map(flow, flat_mapper)}
+    flow_wrapper =
+      Flow.flat_map(flow, fn item ->
+        case safe_invoke(flat_mapper, item) do
+          {:error, reason} -> [{:error, reason}]
+          res -> res
+        end
+      end)
+
+    %GRPCStream{stream | flow: flow_wrapper}
   end
 
   @spec map(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
   def map(%GRPCStream{flow: flow} = stream, mapper) do
-    %GRPCStream{stream | flow: Flow.map(flow, mapper)}
+    flow_wrapper = Flow.map(flow, fn item -> safe_invoke(mapper, item) end)
+    %GRPCStream{stream | flow: flow_wrapper}
   end
 
   @spec map_with_context(GRPCStream.t(), (map(), term -> term)) :: GRPCStream.t()
@@ -73,7 +93,37 @@ defmodule GRPC.Stream.Operators do
       mapper.(meta, item)
     end
 
-    %GRPCStream{stream | flow: Flow.map(flow, wrapper)}
+    flow_wrapper = Flow.map(flow, fn item -> safe_invoke(wrapper, item) end)
+
+    %GRPCStream{stream | flow: flow_wrapper}
+  end
+
+  @spec map_error(GRPCStream.t(), (reason -> term)) :: GRPCStream.t()
+  def map_error(%GRPCStream{flow: flow} = stream, func) when is_function(func, 1) do
+    mapper =
+      Flow.map(flow, fn
+        {:error, reason} -> handle_error(func, reason)
+        {:ok, value} -> value
+        other -> other
+      end)
+
+    %GRPCStream{stream | flow: mapper}
+  end
+
+  defp handle_error(func, reason) do
+    case safe_invoke(func, {:error, reason}) do
+      {:error, %GRPC.RPCError{} = rpc_error} ->
+        {:error, rpc_error}
+
+      {:error, other_reason} ->
+        {:error, GRPC.RPCError.exception(message: inspect(other_reason))}
+
+      {:ok, value} ->
+        value
+
+      other ->
+        other
+    end
   end
 
   @spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t()
@@ -88,7 +138,8 @@ defmodule GRPC.Stream.Operators do
 
   @spec reject(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
   def reject(%GRPCStream{flow: flow} = stream, filter) do
-    %GRPCStream{stream | flow: Flow.reject(flow, filter)}
+    flow_wrapper = Flow.reject(flow, fn item -> safe_invoke(filter, item) end)
+    %GRPCStream{stream | flow: flow_wrapper}
   end
 
   @spec uniq(GRPCStream.t()) :: GRPCStream.t()
@@ -100,4 +151,26 @@ defmodule GRPC.Stream.Operators do
   def uniq_by(%GRPCStream{flow: flow} = stream, fun) do
     %GRPCStream{stream | flow: Flow.uniq_by(flow, fun)}
   end
+
+  # Normalizes and catches exceptions/throws.
+  # Returns:
+  #   value -> successful value
+  #   {:error, reason} -> failure
+  #   {:error, {:exception, exception}} -> failure due to exception
+  #   {:error, {kind, reason}} -> failure due to throw or exit
+  defp safe_invoke(fun, arg) do
+    res = fun.(arg)
+
+    case res do
+      {:ok, v} -> v
+      {:error, reason} -> {:error, reason}
+      other -> other
+    end
+  rescue
+    e ->
+      {:error, {:exception, e}}
+  catch
+    kind, reason ->
+      {:error, {kind, reason}}
+  end
 end
diff --git a/lib/grpc/stub.ex b/lib/grpc/stub.ex
index 715a14fd..5f7796ce 100644
--- a/lib/grpc/stub.ex
+++ b/lib/grpc/stub.ex
@@ -109,7 +109,7 @@ defmodule GRPC.Stub do
   for sending requests. Supports advanced connection resolution via the gRPC `Resolver`
   and various target schemes (`dns`, `unix`, `xds`, `host:port`, etc).
 
-  This function is part of the **connection orchestration layer**, which manages
+  This function is part of the connection orchestration layer, which manages
   connection setup, name resolution, and optional load balancing.
 
   ## Target Syntax
diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs
index c72d7d5b..36ba357f 100644
--- a/test/grpc/integration/server_test.exs
+++ b/test/grpc/integration/server_test.exs
@@ -4,8 +4,12 @@ defmodule GRPC.Integration.ServerTest do
   defmodule FeatureServer do
     use GRPC.Server, service: Routeguide.RouteGuide.Service
 
-    def get_feature(point, _stream) do
-      %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+    def get_feature(point, materializer) do
+      GRPC.Stream.unary(point, materializer: materializer)
+      |> GRPC.Stream.map(fn point ->
+        %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+      end)
+      |> GRPC.Stream.run()
     end
 
     def route_chat(_ex_stream, stream) do
@@ -32,19 +36,25 @@ defmodule GRPC.Integration.ServerTest do
       service: Transcode.Messaging.Service,
       http_transcode: true
 
-    def get_message(msg_request, _stream) do
-      %Transcode.Message{name: msg_request.name, text: "get_message"}
+    def get_message(msg_request, stream) do
+      GRPC.Stream.unary(msg_request, materializer: stream)
+      |> GRPC.Stream.map(fn req ->
+        %Transcode.Message{name: req.name, text: "get_message"}
+      end)
+      |> GRPC.Stream.run()
     end
 
-    def stream_messages(msg_request, stream) do
-      Enum.each(1..5, fn i ->
-        msg = %Transcode.Message{
+    def stream_messages(msg_request, materializer) do
+      1..5
+      |> Stream.take(5)
+      |> GRPC.Stream.from()
+      |> GRPC.Stream.map(fn i ->
+        %Transcode.Message{
           name: msg_request.name,
           text: "#{i}"
         }
-
-        GRPC.Server.send_reply(stream, msg)
       end)
+      |> GRPC.Stream.run_with(materializer)
     end
 
     def create_message(msg, _stream) do
@@ -62,24 +72,33 @@ defmodule GRPC.Integration.ServerTest do
       msg_request.message
     end
 
-    def get_message_with_response_body(msg_request, _) do
-      %Transcode.MessageOut{
-        response: %Transcode.Message{
-          name: msg_request.name,
-          text: "get_message_with_response_body"
+    def get_message_with_response_body(msg_request, materializer) do
+      GRPC.Stream.unary(msg_request, materializer: materializer)
+      |> GRPC.Stream.map(fn req ->
+        %Transcode.MessageOut{
+          response: %Transcode.Message{
+            name: req.name,
+            text: "get_message_with_response_body"
+          }
         }
-      }
+      end)
+      |> GRPC.Stream.run()
     end
 
-    def get_message_with_query(msg_request, _stream) do
-      %Transcode.Message{name: msg_request.name, text: "get_message_with_query"}
+    def get_message_with_query(msg_request, materializer) do
+      GRPC.Stream.unary(msg_request, materializer: materializer)
+      |> GRPC.Stream.map(fn req ->
+        %Transcode.Message{name: req.name, text: "get_message_with_query"}
+      end)
+      |> GRPC.Stream.run()
     end
 
-    def get_message_with_subpath_query(msg_request, _stream) do
-      %Transcode.Message{
-        name: msg_request.message.name,
-        text: "get_message_with_subpath_query"
-      }
+    def get_message_with_subpath_query(msg_request, materializer) do
+      GRPC.Stream.unary(msg_request, materializer: materializer)
+      |> GRPC.Stream.map(fn req ->
+        %Transcode.Message{name: req.message.name, text: "get_message_with_subpath_query"}
+      end)
+      |> GRPC.Stream.run()
     end
   end
 
@@ -168,13 +187,14 @@ defmodule GRPC.Integration.ServerTest do
   defmodule SlowServer do
     use GRPC.Server, service: Routeguide.RouteGuide.Service
 
-    def list_features(rectangle, stream) do
+    def list_features(rectangle, materializer) do
       Process.sleep(400)
+      server_stream = Stream.each([rectangle.lo, rectangle.hi], fn point -> point end)
 
-      Enum.each([rectangle.lo, rectangle.hi], fn point ->
-        feature = simple_feature(point)
-        GRPC.Server.send_reply(stream, feature)
-      end)
+      server_stream
+      |> GRPC.Stream.from()
+      |> GRPC.Stream.map(&simple_feature/1)
+      |> GRPC.Stream.run_with(materializer)
     end
 
     defp simple_feature(point) do
diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs
index 279ee758..e2166233 100644
--- a/test/grpc/stream_test.exs
+++ b/test/grpc/stream_test.exs
@@ -127,7 +127,7 @@ defmodule GRPC.StreamTest do
         |> GRPC.Stream.to_flow()
         |> Enum.to_list()
 
-      assert result == [{:error, "msg", :not_alive}]
+      assert result == [{:error, :process_not_alive}]
     end
   end
 
@@ -165,6 +165,138 @@ defmodule GRPC.StreamTest do
     end
   end
 
+  describe "ask/3 error handling" do
+    test "returns timeout error if response not received in time" do
+      pid =
+        spawn_link(fn ->
+          Process.sleep(:infinity)
+        end)
+
+      result =
+        GRPC.Stream.from([:hello])
+        # very short timeout
+        |> GRPC.Stream.ask(pid, 10)
+        |> GRPC.Stream.to_flow()
+        |> Enum.to_list()
+
+      assert result == [{:error, :timeout}]
+    end
+  end
+
+  describe "safe_invoke/2 handling {:ok, value} and direct value" do
+    test "maps {:ok, value} to value" do
+      stream =
+        GRPC.Stream.from([1, 2])
+        |> GRPC.Stream.map(fn x -> {:ok, x * 10} end)
+
+      result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+      assert result == [10, 20]
+    end
+
+    test "keeps direct values as is" do
+      stream =
+        GRPC.Stream.from([1, 2])
+        |> GRPC.Stream.map(fn x -> x * 5 end)
+
+      result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+      assert result == [5, 10]
+    end
+  end
+
+  describe "safe_invoke/2 catches errors" do
+    test "map/2 handles function returning {:error, reason}" do
+      stream =
+        GRPC.Stream.from([1, 2, 3])
+        |> GRPC.Stream.map(fn
+          2 -> {:error, :fail}
+          x -> x
+        end)
+
+      results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+      assert results == [1, {:error, :fail}, 3]
+    end
+
+    test "map/2 catches exceptions" do
+      stream =
+        GRPC.Stream.from([1, 2])
+        |> GRPC.Stream.map(fn
+          2 -> raise "boom"
+          x -> x
+        end)
+
+      results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+      assert match?([1, {:error, {:exception, %RuntimeError{message: "boom"}}}], results)
+    end
+
+    test "flat_map/2 catches thrown values" do
+      stream =
+        GRPC.Stream.from([1, 2])
+        |> GRPC.Stream.flat_map(fn
+          2 -> throw(:fail)
+          x -> [x]
+        end)
+
+      results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+      assert results == [1, {:error, {:throw, :fail}}]
+    end
+  end
+
+  describe "map_error/2" do
+    test "transforms {:error, reason} tuples" do
+      stream =
+        GRPC.Stream.from([{:error, :invalid_input}, {:ok, 42}, 100])
+        |> GRPC.Stream.map_error(fn
+          {:error, :invalid_input} -> {:error, :mapped_error}
+          msg -> msg
+        end)
+
+      result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+
+      assert Enum.sort(result) == [
+               42,
+               100,
+               {:error,
+                %GRPC.RPCError{
+                  __exception__: true,
+                  details: nil,
+                  message: ":mapped_error",
+                  status: nil
+                }}
+             ]
+    end
+
+    test "transforms exceptions raised inside previous map" do
+      stream =
+        GRPC.Stream.from([1, 2])
+        |> GRPC.Stream.map(fn
+          2 -> raise "boom"
+          x -> x
+        end)
+        |> GRPC.Stream.map_error(fn
+          {:error, %RuntimeError{message: "boom"}} ->
+            GRPC.RPCError.exception(message: "Validation or runtime error")
+
+          msg ->
+            msg
+        end)
+
+      result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+
+      assert match?(
+               [
+                 1,
+                 {:error,
+                  %GRPC.RPCError{
+                    status: nil,
+                    message: "{:exception, %RuntimeError{message: \"boom\"}}",
+                    details: nil
+                  }}
+               ],
+               result
+             )
+    end
+  end
+
   describe "map/2, flat_map/2, filter/2" do
     test "maps values correctly" do
       result =
@@ -197,6 +329,44 @@ defmodule GRPC.StreamTest do
     end
   end
 
+  describe "effect/2" do
+    test "applies side effects without altering values" do
+      parent = self()
+
+      result =
+        GRPC.Stream.from([1, 2, 3])
+        |> GRPC.Stream.effect(fn x -> send(parent, {:effect_called, x}) end)
+        |> GRPC.Stream.to_flow()
+        |> Enum.to_list()
+
+      assert Enum.sort(result) == [1, 2, 3]
+
+      assert_receive {:effect_called, 1}
+      assert_receive {:effect_called, 2}
+      assert_receive {:effect_called, 3}
+    end
+
+    test "continues pipeline even if effect function raises an error" do
+      parent = self()
+
+      result =
+        GRPC.Stream.from([1, 2, 3])
+        |> GRPC.Stream.effect(fn
+          2 -> raise "boom"
+          x -> send(parent, {:effect_called, x})
+        end)
+        |> GRPC.Stream.to_flow()
+        |> Enum.to_list()
+
+      # Even with error 2, the pipeline should continue and return all elements
+      assert Enum.sort(result) == [1, 2, 3]
+
+      # The effect should have been called for 1 and 3
+      assert_receive {:effect_called, 1}
+      assert_receive {:effect_called, 3}
+    end
+  end
+
   describe "test complex operations" do
     test "pipeline with all GRPC.Stream operators" do
       target =
@@ -277,19 +447,19 @@ defmodule GRPC.StreamTest do
     end
   end
 
-  defmodule MyGRPCService do
-    use GRPC.Server, service: Routeguide.RouteGuide.Service
+  describe "run/1" do
+    defmodule MyGRPCService do
+      use GRPC.Server, service: Routeguide.RouteGuide.Service
 
-    def get_feature(input, materializer) do
-      GRPC.Stream.unary(input, materializer: materializer)
-      |> GRPC.Stream.map(fn point ->
-        %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
-      end)
-      |> GRPC.Stream.run()
+      def get_feature(input, materializer) do
+        GRPC.Stream.unary(input, materializer: materializer)
+        |> GRPC.Stream.map(fn point ->
+          %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+        end)
+        |> GRPC.Stream.run()
+      end
     end
-  end
 
-  describe "run/1" do
     test "runs a unary stream" do
       run_server([MyGRPCService], fn port ->
         point = %Routeguide.Point{latitude: 409_146_138, longitude: -746_188_906}