From 7b8161ec6af0f3adc22217c187885c3ba5fb8476 Mon Sep 17 00:00:00 2001 From: Nikko Campbell Date: Tue, 30 Aug 2022 11:59:11 -0400 Subject: [PATCH 1/6] Initial batching code Added tests and relevant fixes to batch projector added after update callback test Update dependencies docs temporary commanded version Skip over partial seen batch remove elixir_uuid after rebase Return error on partially seen batch again --- guides/Usage.md | 34 ++++ lib/projections/ecto.ex | 114 +++++++++++++- mix.exs | 2 +- .../ecto_projection_batch_test.exs | 145 ++++++++++++++++++ 4 files changed, 291 insertions(+), 4 deletions(-) create mode 100644 test/projections/ecto_projection_batch_test.exs diff --git a/guides/Usage.md b/guides/Usage.md index 7202535..0a12818 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -116,6 +116,35 @@ project %ItemUpdated{uuid: uuid} = event, _metadata, fn multi -> end ``` + +### Using the `project_batch` macro + +By configuring the base event handler to use batching and using the `project_batch/2` macro, you can project a batch of events in a single step within a transaction. `project_batch/2` receives a list of `{event, metadata}` tuples for all the events in the batch and a similar single-arity function as `project/3` to affect an `Ecto.Multi` structure. + +Note that there is currently no built in way to target a single type of event to be projected, and as such a single `project_batch` macro is expected to gracefully handle (or ignore) any events that it may receive + +#### Example +```elixir +defmodule MyApp.Projections.BatchProjector do + use Commanded.Projections.Ecto, + application: MyApp.Application, + repo: MyApp.Projections.Repo, + name: "example_batch_projection", + callback_handler: :batch, # This tells the underlying EventHandler to batch events + + project_batch events, fn multi -> + projections = events + |> Enum.map(fn + {%AnEvent{name: name}, _metadata} -> %{name: name} + _ -> nil + end) + |> Enum.reject(&is_nil/1) + + Ecto.Multi.insert_all(multi, :example_batch_projection, Projection, projections) + end +end +``` + ## Supervision Your projector module must be included in your application supervision tree: @@ -217,6 +246,11 @@ end You could use this function to notify subscribers that the read model has been updated (e.g. pub/sub to Phoenix channels). +### `after_update_batch/2` callback + +Similarly for batching projectors, you can define an `after_update_batch/2` callback function in a projector to be called after a batch of events has been projected. The functions receives a list of `{event, metadata}` tuples for each processed event and all changes from the `Ecto.Multi` struct + + ## Schema prefix When using a prefix for your Ecto schemas you might also want to change the prefix for the `ProjectionVersion` schema. There are a number of options to do this: diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index 878f361..28bc0f9 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -45,7 +45,7 @@ defmodule Commanded.Projections.Ecto do # Pass through any other configuration to the event handler @handler_opts Keyword.drop(@opts, [:repo, :schema_prefix, :timeout]) - unquote(__include_schema_prefix__(schema_prefix)) + unquote(__include_schema_prefix__(schema_prefix, opts)) unquote(__include_projection_version_schema__()) use Ecto.Schema @@ -107,6 +107,65 @@ defmodule Commanded.Projections.Ecto do end end + def update_projection_batch(events, multi_fn) do + {first_event, first_event_metadata} = List.first(events) + + first_event_number = Map.fetch!(first_event_metadata, :event_number) + projection_name = Map.fetch!(first_event_metadata, :handler_name) + prefix = schema_prefix(first_event, first_event_metadata) + + {_last_event, last_event_metadata} = List.last(events) + + last_event_number = Map.fetch!(last_event_metadata, :event_number) + + changeset = + %ProjectionVersion{projection_name: projection_name} + |> ProjectionVersion.changeset(%{last_seen_event_number: last_event_number}) + + multi = + Ecto.Multi.new() + |> Ecto.Multi.run(:verify_projection_version, fn repo, _changes -> + version = + case repo.get(ProjectionVersion, projection_name, prefix: prefix) do + nil -> + repo.insert!( + %ProjectionVersion{ + projection_name: projection_name, + last_seen_event_number: 0 + }, + prefix: prefix + ) + + version -> + version + end + + case version.last_seen_event_number do + last_seen when last_seen < first_event_number -> {:ok, %{version: version}} + last_seen when last_seen >= last_event_number -> {:error, :already_seen_batch} + last_seen -> {:error, {:already_seen_partial_batch, last_seen}} + end + end) + |> Ecto.Multi.update(:projection_version, changeset, prefix: prefix) + + with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]), + {:ok, changes} <- transaction(multi) do + if function_exported?(__MODULE__, :after_update_batch, 2) do + apply(__MODULE__, :after_update_batch, [events, changes]) + else + :ok + end + else + {:error, :verify_projection_version, :already_seen_batch, _changes} -> :ok + {:error, :verify_projection_version, {:already_seen_partial_batch, last_seen}, _changes} -> + {event, _metadata} = Enum.find(events, fn {event, metadata} -> metadata.event_number == last_seen end) + + {:error, :already_seen_partial_batch, event} + {:error, _stage, error, _changes} -> {:error, error} + {:error, _error} = reply -> reply + end + end + defp transaction(%Ecto.Multi{} = multi) do @repo.transaction(multi, timeout: @timeout, pool_timeout: @timeout) end @@ -117,7 +176,7 @@ defmodule Commanded.Projections.Ecto do ## User callbacks - @optional_callbacks [after_update: 3, schema_prefix: 1, schema_prefix: 2] + @optional_callbacks [after_update: 3, after_update_batch: 2, schema_prefix: 1, schema_prefix: 2] @doc """ The optional `after_update/3` callback function defined in a projector is @@ -152,6 +211,38 @@ defmodule Commanded.Projections.Ecto do @callback after_update(event :: struct, metadata :: map, changes :: Ecto.Multi.changes()) :: :ok | {:error, any} + @doc """ + The optional `after_update_batch/2` callback function defined in a projector is + called after a batch of projected events. + + The function receives the events, their metadata, and all changes from the + `Ecto.Multi` struct that were executed within the database transaction. + + ## Example + + defmodule MyApp.ExampleProjector do + use Commanded.Projections.Ecto, + application: MyApp.Application, + repo: MyApp.Projections.Repo, + name: "MyApp.ExampleProjector" + handler_callback: :batch + + project_batch events, fn multi -> + {%AnEvent{name: name}, metadata} = List.first(events) + Ecto.Multi.insert(multi, :example_projection, %ExampleProjection{name: name}) + end + + @impl Commanded.Projections.Ecto + def after_update_batch(events, changes) do + # Use the events, metadata, or `Ecto.Multi` changes and return `:ok` + :ok + end + end + + """ + @callback after_update_batch(events :: list(tuple), changes :: Ecto.Multi.changes()) :: + :ok | {:error, any} + @doc """ The optional `schema_prefix/1` callback function defined in a projector is used to set the schema of the `projection_versions` table used by the @@ -172,7 +263,10 @@ defmodule Commanded.Projections.Ecto do """ @callback schema_prefix(event :: struct(), metadata :: map()) :: String.t() | nil - defp __include_schema_prefix__(schema_prefix) do + defp __include_schema_prefix__(schema_prefix, opts \\ []) do + + is_batch_projector = opts[:handler_callback] == :batch + quote do cond do is_nil(unquote(schema_prefix)) -> @@ -183,6 +277,12 @@ defmodule Commanded.Projections.Ecto do def schema_prefix(_event), do: nil def schema_prefix(_event, _metadata), do: unquote(schema_prefix) + !is_binary(unquote(schema_prefix)) and unquote(is_batch_projector) -> + raise ArgumentError, + message: + "expected :schema_prefix option for batch projector to be a string, but got: " <> + inspect(unquote(schema_prefix)) + is_function(unquote(schema_prefix), 1) -> def schema_prefix(event), do: nil def schema_prefix(event, _metadata), do: apply(unquote(schema_prefix), [event]) @@ -291,4 +391,12 @@ defmodule Commanded.Projections.Ecto do end end end + + defmacro project_batch(events, lambda) do + quote do + def handle_batch(unquote(events) = events) do + update_projection_batch(events, unquote(lambda)) + end + end + end end diff --git a/mix.exs b/mix.exs index 6c54c68..c5d0302 100644 --- a/mix.exs +++ b/mix.exs @@ -31,7 +31,7 @@ defmodule Commanded.Projections.Ecto.Mixfile do defp deps do [ - {:commanded, "~> 1.4"}, + {:commanded, github: "calmwave-open-source/commanded", branch: "batching-support"}, {:ecto, "~> 3.11"}, {:ecto_sql, "~> 3.11"}, {:postgrex, ">= 0.0.0", only: :test}, diff --git a/test/projections/ecto_projection_batch_test.exs b/test/projections/ecto_projection_batch_test.exs new file mode 100644 index 0000000..2856b81 --- /dev/null +++ b/test/projections/ecto_projection_batch_test.exs @@ -0,0 +1,145 @@ +defmodule Commanded.Projections.EctoProjectionBatchTest do + use ExUnit.Case + + import Commanded.Projections.ProjectionAssertions + + alias Commanded.Projections.Events.{AnEvent, AnotherEvent, ErrorEvent} + alias Commanded.Projections.Projection + alias Commanded.Projections.Repo + + defmodule BatchProjector do + use Commanded.Projections.Ecto, + application: TestApplication, + name: "BatchProjector", + callback_handler: :batch + + project_batch events, fn multi -> + projections = Enum.map(events, fn + {%AnEvent{name: name}, _metadata} -> %{name: name} + {%AnotherEvent{name: name}, _metadata} -> %{name: name} + {%ErrorEvent{}, _metadata} -> :error + end) + + if Enum.any?(projections, & &1 == :error) do + Ecto.Multi.error(multi, :projection, :failure) + else + Ecto.Multi.insert_all(multi, :projection, Projection, projections) + end + end + end + + setup do + start_supervised!(TestApplication) + Ecto.Adapters.SQL.Sandbox.checkout(Repo) + end + + test "should handle multiple projected events" do + assert :ok == BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 2}} + ]) + + assert_projections(Projection, ["AnEvent", "AnEvent"]) + assert_seen_event("BatchProjector", 2) + end + + test "should handle two different types of projected events" do + assert :ok == BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnotherEvent{}, %{handler_name: "BatchProjector", event_number: 2}} + ]) + + assert_projections(Projection, ["AnEvent", "AnotherEvent"]) + assert_seen_event("BatchProjector", 2) + end + + test "should ignore already projected batch" do + assert :ok == BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) + + assert :ok == BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) + + assert_projections(Projection, ["AnEvent"]) + assert_seen_event("BatchProjector", 1) + end + + test "partial batch already seen should return error with last seen event" do + assert :ok == BatchProjector.handle_batch([ + {%AnEvent{name: "e1"}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}} + ]) + + assert_projections(Projection, ["e1", "e2", "e3"]) + assert_seen_event("BatchProjector", 3) + + assert {:error, :already_seen_partial_batch, %AnEvent{name: "e3"}} == BatchProjector.handle_batch([ + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}}, + {%AnEvent{name: "e4"}, %{handler_name: "BatchProjector", event_number: 4}} + ]) + end + + test "should return an error on failure" do + assert {:error, :failure} == BatchProjector.handle_batch([ + {%ErrorEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) + + assert_projections(Projection, []) + end + + defmodule BatchProjectorAfterUpdateCallback do + use Commanded.Projections.Ecto, + application: TestApplication, + callback_handler: :batch + + project_batch events, fn multi -> + projections = Enum.map(events, fn + {%{name: name}, _metadata} -> %{name: name} + end) + + Ecto.Multi.insert_all(multi, :projection, Projection, projections) + end + + def after_update_batch(events, changes) do + {%{pid: pid}, _metadata} = List.first(events) + + send(pid, {:after_update_batch, length(events), changes}) + + :ok + end + end + + test "should call after_update_batch/2 callback" do + assert :ok == BatchProjectorAfterUpdateCallback.handle_batch([ + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 3}} + ]) + + assert_receive {:after_update_batch, 3, _changes} + end + + test "should not compile if both project/2 and project_batch/1 are defined" do + assert_raise CompileError, fn -> + ast = quote do + defmodule InvalidBatchProjector do + use Commanded.Projections.Ecto, application: TestApplication + + project %AnEvent{}, _metadata, fn multi -> + multi + end + + project_batch events, fn multi -> + multi + end + end + end + + Code.eval_quoted(ast) + end + end +end From 850ef28706b301f65bcd1395baa436582462cc16 Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Wed, 18 Sep 2024 15:44:24 -0600 Subject: [PATCH 2/6] Update mix lock --- mix.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mix.lock b/mix.lock index c4cc6a6..305de0f 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, - "commanded": {:hex, :commanded, "1.4.3", "1f58c58ba428248db101bedb7bcd3c2c621b35fc3169fdfa1ffb8d97b60616bf", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "040c39a073922b360d71f3db597e8f15acf7c707026248f46ddd45111d1a6c79"}, + "commanded": {:git, "https://github.com/calmwave-open-source/commanded.git", "fb46ba04bcb4a6e3b54e39f1564cdcbe61a5a333", [branch: "batching-support"]}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, @@ -11,7 +11,7 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, @@ -19,6 +19,6 @@ "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "postgrex": {:hex, :postgrex, "0.17.4", "5777781f80f53b7c431a001c8dad83ee167bcebcf3a793e3906efff680ab62b3", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "6458f7d5b70652bc81c3ea759f91736c16a31be000f306d3c64bcdfe9a18b3cc"}, - "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_registry": {:hex, :telemetry_registry, "0.3.2", "701576890320be6428189bff963e865e8f23e0ff3615eade8f78662be0fc003c", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7ed191eb1d115a3034af8e1e35e4e63d5348851d556646d46ca3d1b4e16bab9"}, } From 86e82fc54a815a7763f13154e3f22f523057bdb7 Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Thu, 19 Sep 2024 09:52:34 -0600 Subject: [PATCH 3/6] Do not worry about partial batch for now --- lib/projections/ecto.ex | 59 ++++---- .../ecto_projection_batch_test.exs | 143 +++++++++++------- 2 files changed, 114 insertions(+), 88 deletions(-) diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index 28bc0f9..20e706e 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -118,35 +118,39 @@ defmodule Commanded.Projections.Ecto do last_event_number = Map.fetch!(last_event_metadata, :event_number) - changeset = - %ProjectionVersion{projection_name: projection_name} - |> ProjectionVersion.changeset(%{last_seen_event_number: last_event_number}) + projection_version = %ProjectionVersion{ + projection_name: projection_name, + last_seen_event_number: last_event_number + } + + # Query to update an existing projection version with the last seen event number with + # a check to ensure that the event has not already been projected. + update_projection_version = + from(pv in ProjectionVersion, + where: + pv.projection_name == ^projection_name and + pv.last_seen_event_number < ^last_event_number, + update: [set: [last_seen_event_number: ^last_event_number]] + ) multi = Ecto.Multi.new() - |> Ecto.Multi.run(:verify_projection_version, fn repo, _changes -> - version = - case repo.get(ProjectionVersion, projection_name, prefix: prefix) do - nil -> - repo.insert!( - %ProjectionVersion{ - projection_name: projection_name, - last_seen_event_number: 0 - }, - prefix: prefix - ) - - version -> - version - end - - case version.last_seen_event_number do - last_seen when last_seen < first_event_number -> {:ok, %{version: version}} - last_seen when last_seen >= last_event_number -> {:error, :already_seen_batch} - last_seen -> {:error, {:already_seen_partial_batch, last_seen}} + |> Ecto.Multi.run(:track_projection_version, fn repo, _changes -> + try do + repo.insert(projection_version, + prefix: prefix, + on_conflict: update_projection_version, + conflict_target: [:projection_name] + ) + rescue + exception in Ecto.StaleEntryError -> + # Attempted to insert a projection version for an already seen event + {:error, :already_seen_event} + + exception -> + reraise exception, __STACKTRACE__ end end) - |> Ecto.Multi.update(:projection_version, changeset, prefix: prefix) with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]), {:ok, changes} <- transaction(multi) do @@ -156,11 +160,7 @@ defmodule Commanded.Projections.Ecto do :ok end else - {:error, :verify_projection_version, :already_seen_batch, _changes} -> :ok - {:error, :verify_projection_version, {:already_seen_partial_batch, last_seen}, _changes} -> - {event, _metadata} = Enum.find(events, fn {event, metadata} -> metadata.event_number == last_seen end) - - {:error, :already_seen_partial_batch, event} + {:error, :track_projection_version, :already_seen_event, _changes} -> :ok {:error, _stage, error, _changes} -> {:error, error} {:error, _error} = reply -> reply end @@ -264,7 +264,6 @@ defmodule Commanded.Projections.Ecto do @callback schema_prefix(event :: struct(), metadata :: map()) :: String.t() | nil defp __include_schema_prefix__(schema_prefix, opts \\ []) do - is_batch_projector = opts[:handler_callback] == :batch quote do diff --git a/test/projections/ecto_projection_batch_test.exs b/test/projections/ecto_projection_batch_test.exs index 2856b81..7925691 100644 --- a/test/projections/ecto_projection_batch_test.exs +++ b/test/projections/ecto_projection_batch_test.exs @@ -13,19 +13,20 @@ defmodule Commanded.Projections.EctoProjectionBatchTest do name: "BatchProjector", callback_handler: :batch - project_batch events, fn multi -> - projections = Enum.map(events, fn - {%AnEvent{name: name}, _metadata} -> %{name: name} - {%AnotherEvent{name: name}, _metadata} -> %{name: name} - {%ErrorEvent{}, _metadata} -> :error - end) - - if Enum.any?(projections, & &1 == :error) do + project_batch(events, fn multi -> + projections = + Enum.map(events, fn + {%AnEvent{name: name}, _metadata} -> %{name: name} + {%AnotherEvent{name: name}, _metadata} -> %{name: name} + {%ErrorEvent{}, _metadata} -> :error + end) + + if Enum.any?(projections, &(&1 == :error)) do Ecto.Multi.error(multi, :projection, :failure) else Ecto.Multi.insert_all(multi, :projection, Projection, projections) end - end + end) end setup do @@ -34,59 +35,82 @@ defmodule Commanded.Projections.EctoProjectionBatchTest do end test "should handle multiple projected events" do - assert :ok == BatchProjector.handle_batch([ - {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, - {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 2}} - ]) + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 2}} + ]) assert_projections(Projection, ["AnEvent", "AnEvent"]) assert_seen_event("BatchProjector", 2) end test "should handle two different types of projected events" do - assert :ok == BatchProjector.handle_batch([ - {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, - {%AnotherEvent{}, %{handler_name: "BatchProjector", event_number: 2}} - ]) + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnotherEvent{}, %{handler_name: "BatchProjector", event_number: 2}} + ]) assert_projections(Projection, ["AnEvent", "AnotherEvent"]) assert_seen_event("BatchProjector", 2) end test "should ignore already projected batch" do - assert :ok == BatchProjector.handle_batch([ - {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} - ]) + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) - assert :ok == BatchProjector.handle_batch([ - {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} - ]) + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) assert_projections(Projection, ["AnEvent"]) assert_seen_event("BatchProjector", 1) end - test "partial batch already seen should return error with last seen event" do - assert :ok == BatchProjector.handle_batch([ - {%AnEvent{name: "e1"}, %{handler_name: "BatchProjector", event_number: 1}}, - {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, - {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}} - ]) + test "partial batch already seen should return an :ok" do + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{name: "e1"}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}} + ]) assert_projections(Projection, ["e1", "e2", "e3"]) assert_seen_event("BatchProjector", 3) - assert {:error, :already_seen_partial_batch, %AnEvent{name: "e3"}} == BatchProjector.handle_batch([ - {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, - {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}}, - {%AnEvent{name: "e4"}, %{handler_name: "BatchProjector", event_number: 4}} - ]) + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}}, + {%AnEvent{name: "e4"}, %{handler_name: "BatchProjector", event_number: 4}} + ]) + end + + test "entire batches already seen should return an :ok" do + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{name: "e1"}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}} + ]) + + assert :ok == + BatchProjector.handle_batch([ + {%AnEvent{name: "e1"}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{name: "e2"}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{name: "e3"}, %{handler_name: "BatchProjector", event_number: 3}} + ]) end test "should return an error on failure" do - assert {:error, :failure} == BatchProjector.handle_batch([ - {%ErrorEvent{}, %{handler_name: "BatchProjector", event_number: 1}} - ]) + assert {:error, :failure} == + BatchProjector.handle_batch([ + {%ErrorEvent{}, %{handler_name: "BatchProjector", event_number: 1}} + ]) assert_projections(Projection, []) end @@ -96,13 +120,14 @@ defmodule Commanded.Projections.EctoProjectionBatchTest do application: TestApplication, callback_handler: :batch - project_batch events, fn multi -> - projections = Enum.map(events, fn - {%{name: name}, _metadata} -> %{name: name} - end) + project_batch(events, fn multi -> + projections = + Enum.map(events, fn + {%{name: name}, _metadata} -> %{name: name} + end) Ecto.Multi.insert_all(multi, :projection, Projection, projections) - end + end) def after_update_batch(events, changes) do {%{pid: pid}, _metadata} = List.first(events) @@ -114,30 +139,32 @@ defmodule Commanded.Projections.EctoProjectionBatchTest do end test "should call after_update_batch/2 callback" do - assert :ok == BatchProjectorAfterUpdateCallback.handle_batch([ - {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 1}}, - {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 2}}, - {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 3}} - ]) + assert :ok == + BatchProjectorAfterUpdateCallback.handle_batch([ + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 1}}, + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 2}}, + {%AnEvent{pid: self()}, %{handler_name: "BatchProjector", event_number: 3}} + ]) assert_receive {:after_update_batch, 3, _changes} end test "should not compile if both project/2 and project_batch/1 are defined" do assert_raise CompileError, fn -> - ast = quote do - defmodule InvalidBatchProjector do - use Commanded.Projections.Ecto, application: TestApplication - - project %AnEvent{}, _metadata, fn multi -> - multi - end - - project_batch events, fn multi -> - multi + ast = + quote do + defmodule InvalidBatchProjector do + use Commanded.Projections.Ecto, application: TestApplication + + project %AnEvent{}, _metadata, fn multi -> + multi + end + + project_batch(events, fn multi -> + multi + end) end end - end Code.eval_quoted(ast) end From dbe192584ca46e41f667abc70d600e01ab5e80e8 Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Fri, 20 Sep 2024 13:12:35 -0600 Subject: [PATCH 4/6] Pattern match for list --- lib/projections/ecto.ex | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index 20e706e..cd98026 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -107,16 +107,14 @@ defmodule Commanded.Projections.Ecto do end end - def update_projection_batch(events, multi_fn) do + def update_projection_batch([{first_event, first_event_metadata} | _] = events, multi_fn) do {first_event, first_event_metadata} = List.first(events) - - first_event_number = Map.fetch!(first_event_metadata, :event_number) - projection_name = Map.fetch!(first_event_metadata, :handler_name) - prefix = schema_prefix(first_event, first_event_metadata) + %{event_number: first_event_number, handler_name: projection_name} = first_event_metadata {_last_event, last_event_metadata} = List.last(events) + %{event_number: last_event_number} = last_event_metadata - last_event_number = Map.fetch!(last_event_metadata, :event_number) + prefix = schema_prefix(first_event, first_event_metadata) projection_version = %ProjectionVersion{ projection_name: projection_name, From e1281ff833439efa5de2406241a2969de2109ae1 Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Fri, 20 Sep 2024 13:20:43 -0600 Subject: [PATCH 5/6] Update the documentation --- guides/Usage.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/guides/Usage.md b/guides/Usage.md index 0a12818..ed779e0 100644 --- a/guides/Usage.md +++ b/guides/Usage.md @@ -119,7 +119,7 @@ end ### Using the `project_batch` macro -By configuring the base event handler to use batching and using the `project_batch/2` macro, you can project a batch of events in a single step within a transaction. `project_batch/2` receives a list of `{event, metadata}` tuples for all the events in the batch and a similar single-arity function as `project/3` to affect an `Ecto.Multi` structure. +You can use `project_batch` to receive events in batches. To enable batching, you need to set the `batch_size` and use the `project_batch/2` macro. `project_batch/2` receives a list of `{event, metadata}` tuples for all the events in the batch and a similar single-arity function as `project/3` to affect an `Ecto.Multi` structure. Note that there is currently no built in way to target a single type of event to be projected, and as such a single `project_batch` macro is expected to gracefully handle (or ignore) any events that it may receive @@ -130,7 +130,7 @@ defmodule MyApp.Projections.BatchProjector do application: MyApp.Application, repo: MyApp.Projections.Repo, name: "example_batch_projection", - callback_handler: :batch, # This tells the underlying EventHandler to batch events + batch_size: 10 project_batch events, fn multi -> projections = events From 40ad2fbe8a8625a08c7b35efec3905a706472ffd Mon Sep 17 00:00:00 2001 From: Daven Casia Date: Fri, 20 Sep 2024 15:17:06 -0600 Subject: [PATCH 6/6] Remove unused line --- lib/projections/ecto.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index cd98026..ae13114 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -108,7 +108,6 @@ defmodule Commanded.Projections.Ecto do end def update_projection_batch([{first_event, first_event_metadata} | _] = events, multi_fn) do - {first_event, first_event_metadata} = List.first(events) %{event_number: first_event_number, handler_name: projection_name} = first_event_metadata {_last_event, last_event_metadata} = List.last(events)