Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions guides/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,35 @@ project %ItemUpdated{uuid: uuid} = event, _metadata, fn multi ->
end
```


### Using the `project_batch` macro

You can use `project_batch` to receive events in batches. To enable batching, you need to set the `batch_size` and use the `project_batch/2` macro. `project_batch/2` receives a list of `{event, metadata}` tuples for all the events in the batch and a similar single-arity function as `project/3` to affect an `Ecto.Multi` structure.

Note that there is currently no built in way to target a single type of event to be projected, and as such a single `project_batch` macro is expected to gracefully handle (or ignore) any events that it may receive

#### Example
```elixir
defmodule MyApp.Projections.BatchProjector do
use Commanded.Projections.Ecto,
application: MyApp.Application,
repo: MyApp.Projections.Repo,
name: "example_batch_projection",
batch_size: 10

project_batch events, fn multi ->
projections = events
|> Enum.map(fn
{%AnEvent{name: name}, _metadata} -> %{name: name}
_ -> nil
end)
|> Enum.reject(&is_nil/1)

Ecto.Multi.insert_all(multi, :example_batch_projection, Projection, projections)
end
end
```

## Supervision

Your projector module must be included in your application supervision tree:
Expand Down Expand Up @@ -217,6 +246,11 @@ end

You could use this function to notify subscribers that the read model has been updated (e.g. pub/sub to Phoenix channels).

### `after_update_batch/2` callback

Similarly for batching projectors, you can define an `after_update_batch/2` callback function in a projector to be called after a batch of events has been projected. The functions receives a list of `{event, metadata}` tuples for each processed event and all changes from the `Ecto.Multi` struct


## Schema prefix

When using a prefix for your Ecto schemas you might also want to change the prefix for the `ProjectionVersion` schema. There are a number of options to do this:
Expand Down
110 changes: 107 additions & 3 deletions lib/projections/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Commanded.Projections.Ecto do
# Pass through any other configuration to the event handler
@handler_opts Keyword.drop(@opts, [:repo, :schema_prefix, :timeout])

unquote(__include_schema_prefix__(schema_prefix))
unquote(__include_schema_prefix__(schema_prefix, opts))
unquote(__include_projection_version_schema__())

use Ecto.Schema
Expand Down Expand Up @@ -107,6 +107,62 @@ defmodule Commanded.Projections.Ecto do
end
end

def update_projection_batch([{first_event, first_event_metadata} | _] = events, multi_fn) do
%{event_number: first_event_number, handler_name: projection_name} = first_event_metadata

{_last_event, last_event_metadata} = List.last(events)
%{event_number: last_event_number} = last_event_metadata

prefix = schema_prefix(first_event, first_event_metadata)

projection_version = %ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: last_event_number
}

# Query to update an existing projection version with the last seen event number with
# a check to ensure that the event has not already been projected.
update_projection_version =
from(pv in ProjectionVersion,
where:
pv.projection_name == ^projection_name and
pv.last_seen_event_number < ^last_event_number,
update: [set: [last_seen_event_number: ^last_event_number]]
)

multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:track_projection_version, fn repo, _changes ->
try do
repo.insert(projection_version,
prefix: prefix,
on_conflict: update_projection_version,
conflict_target: [:projection_name]
)
rescue
exception in Ecto.StaleEntryError ->
# Attempted to insert a projection version for an already seen event
{:error, :already_seen_event}

exception ->
reraise exception, __STACKTRACE__
end
end)

with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- transaction(multi) do
if function_exported?(__MODULE__, :after_update_batch, 2) do
apply(__MODULE__, :after_update_batch, [events, changes])
else
:ok
end
else
{:error, :track_projection_version, :already_seen_event, _changes} -> :ok
{:error, _stage, error, _changes} -> {:error, error}
{:error, _error} = reply -> reply
end
end

defp transaction(%Ecto.Multi{} = multi) do
@repo.transaction(multi, timeout: @timeout, pool_timeout: @timeout)
end
Expand All @@ -117,7 +173,7 @@ defmodule Commanded.Projections.Ecto do

## User callbacks

@optional_callbacks [after_update: 3, schema_prefix: 1, schema_prefix: 2]
@optional_callbacks [after_update: 3, after_update_batch: 2, schema_prefix: 1, schema_prefix: 2]

@doc """
The optional `after_update/3` callback function defined in a projector is
Expand Down Expand Up @@ -152,6 +208,38 @@ defmodule Commanded.Projections.Ecto do
@callback after_update(event :: struct, metadata :: map, changes :: Ecto.Multi.changes()) ::
:ok | {:error, any}

@doc """
The optional `after_update_batch/2` callback function defined in a projector is
called after a batch of projected events.

The function receives the events, their metadata, and all changes from the
`Ecto.Multi` struct that were executed within the database transaction.

## Example

defmodule MyApp.ExampleProjector do
use Commanded.Projections.Ecto,
application: MyApp.Application,
repo: MyApp.Projections.Repo,
name: "MyApp.ExampleProjector"
handler_callback: :batch

project_batch events, fn multi ->
{%AnEvent{name: name}, metadata} = List.first(events)
Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name})
end

@impl Commanded.Projections.Ecto
def after_update_batch(events, changes) do
# Use the events, metadata, or `Ecto.Multi` changes and return `:ok`
:ok
end
end

"""
@callback after_update_batch(events :: list(tuple), changes :: Ecto.Multi.changes()) ::
:ok | {:error, any}

@doc """
The optional `schema_prefix/1` callback function defined in a projector is
used to set the schema of the `projection_versions` table used by the
Expand All @@ -172,7 +260,9 @@ defmodule Commanded.Projections.Ecto do
"""
@callback schema_prefix(event :: struct(), metadata :: map()) :: String.t() | nil

defp __include_schema_prefix__(schema_prefix) do
defp __include_schema_prefix__(schema_prefix, opts \\ []) do
is_batch_projector = opts[:handler_callback] == :batch

quote do
cond do
is_nil(unquote(schema_prefix)) ->
Expand All @@ -183,6 +273,12 @@ defmodule Commanded.Projections.Ecto do
def schema_prefix(_event), do: nil
def schema_prefix(_event, _metadata), do: unquote(schema_prefix)

!is_binary(unquote(schema_prefix)) and unquote(is_batch_projector) ->
raise ArgumentError,
message:
"expected :schema_prefix option for batch projector to be a string, but got: " <>
inspect(unquote(schema_prefix))

is_function(unquote(schema_prefix), 1) ->
def schema_prefix(event), do: nil
def schema_prefix(event, _metadata), do: apply(unquote(schema_prefix), [event])
Expand Down Expand Up @@ -291,4 +387,12 @@ defmodule Commanded.Projections.Ecto do
end
end
end

defmacro project_batch(events, lambda) do
quote do
def handle_batch(unquote(events) = events) do
update_projection_batch(events, unquote(lambda))
end
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Commanded.Projections.Ecto.Mixfile do

defp deps do
[
{:commanded, "~> 1.4"},
{:commanded, github: "calmwave-open-source/commanded", branch: "batching-support"},
{:ecto, "~> 3.11"},
{:ecto_sql, "~> 3.11"},
{:postgrex, ">= 0.0.0", only: :test},
Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"},
"commanded": {:hex, :commanded, "1.4.3", "1f58c58ba428248db101bedb7bcd3c2c621b35fc3169fdfa1ffb8d97b60616bf", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "040c39a073922b360d71f3db597e8f15acf7c707026248f46ddd45111d1a6c79"},
"commanded": {:git, "https://github.com/calmwave-open-source/commanded.git", "fb46ba04bcb4a6e3b54e39f1564cdcbe61a5a333", [branch: "batching-support"]},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
Expand All @@ -11,14 +11,14 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"telemetry_registry": {:hex, :telemetry_registry, "0.3.2", "701576890320be6428189bff963e865e8f23e0ff3615eade8f78662be0fc003c", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7ed191eb1d115a3034af8e1e35e4e63d5348851d556646d46ca3d1b4e16bab9"},
}
Loading
Loading