Skip to content

Commit b8dd5f3

Browse files
authored
feat: selective broadcast for postgres changes (#1573)
Selective broadcast for Postgres Changes. We use the fact that we know where each subscription subscribed from and do a direct broadcast for each node instead of spamming the whole cluster. If the subscription node is not readily available (for whatever reason) we fallback to sending to the whole cluster.
1 parent 05ec589 commit b8dd5f3

17 files changed

+863
-82
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
2121
alias Realtime.RateCounter
2222
alias Realtime.Tenants
2323

24+
alias RealtimeWeb.TenantBroadcaster
25+
2426
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
2527

2628
@impl true
2729
def init(args) do
2830
tenant_id = args["id"]
2931
Logger.metadata(external_id: tenant_id, project: tenant_id)
3032

31-
%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)
32-
3333
rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)
3434

3535
RateCounter.new(rate_counter_args)
@@ -50,7 +50,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
5050
retry_count: 0,
5151
slot_name: args["slot_name"] <> slot_name_suffix(),
5252
tenant_id: tenant_id,
53-
rate_counter_args: rate_counter_args
53+
rate_counter_args: rate_counter_args,
54+
subscribers_nodes_table: args["subscribers_nodes_table"]
5455
}
5556

5657
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
@@ -84,6 +85,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
8485
max_changes: max_changes,
8586
conn: conn,
8687
tenant_id: tenant_id,
88+
subscribers_nodes_table: subscribers_nodes_table,
8789
rate_counter_args: rate_counter_args
8890
} = state
8991
) do
@@ -94,7 +96,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
9496
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
9597
record_list_changes_telemetry(time, tenant_id)
9698

97-
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
99+
case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
98100
{:ok, row_count} ->
99101
Backoff.reset(backoff)
100102

@@ -187,6 +189,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
187189
rows: [_ | _] = rows,
188190
num_rows: rows_count
189191
}},
192+
subscribers_nodes_table,
190193
tenant_id,
191194
rate_counter_args
192195
) do
@@ -201,15 +204,47 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
201204
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
202205
topic = "realtime:postgres:" <> tenant_id
203206

204-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
207+
case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do
208+
{:ok, nodes} ->
209+
for node <- nodes do
210+
TenantBroadcaster.pubsub_direct_broadcast(
211+
node,
212+
tenant_id,
213+
topic,
214+
change,
215+
MessageDispatcher,
216+
:postgres_changes
217+
)
218+
end
219+
220+
{:error, :node_not_found} ->
221+
TenantBroadcaster.pubsub_broadcast(
222+
tenant_id,
223+
topic,
224+
change,
225+
MessageDispatcher,
226+
:postgres_changes
227+
)
228+
end
205229
end
206230
end
207231

208232
{:ok, rows_count}
209233
end
210234

211-
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
212-
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}
235+
defp handle_list_changes_result({:ok, _}, _, _, _), do: {:ok, 0}
236+
defp handle_list_changes_result({:error, reason}, _, _, _), do: {:error, reason}
237+
238+
defp collect_subscription_nodes(subscribers_nodes_table, subscription_ids) do
239+
Enum.reduce_while(subscription_ids, {:ok, MapSet.new()}, fn subscription_id, {:ok, acc} ->
240+
case :ets.lookup(subscribers_nodes_table, subscription_id) do
241+
[{_, node}] -> {:cont, {:ok, MapSet.put(acc, node)}}
242+
_ -> {:halt, {:error, :node_not_found}}
243+
end
244+
end)
245+
rescue
246+
_ -> {:error, :node_not_found}
247+
end
213248

214249
def generate_record([
215250
{"wal",

lib/extensions/postgres_cdc_rls/subscription_manager.ex

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
2424
defstruct [
2525
:id,
2626
:publication,
27-
:subscribers_tid,
27+
:subscribers_pids_table,
28+
:subscribers_nodes_table,
2829
:conn,
2930
:delete_queue,
3031
:no_users_ref,
@@ -37,7 +38,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
3738
@type t :: %__MODULE__{
3839
id: String.t(),
3940
publication: String.t(),
40-
subscribers_tid: :ets.tid(),
41+
subscribers_pids_table: :ets.tid(),
42+
subscribers_nodes_table: :ets.tid(),
4143
conn: Postgrex.conn(),
4244
oids: map(),
4345
check_oid_ref: reference() | nil,
@@ -67,7 +69,12 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
6769

6870
@impl true
6971
def handle_continue({:connect, args}, _) do
70-
%{"id" => id, "publication" => publication, "subscribers_tid" => subscribers_tid} = args
72+
%{
73+
"id" => id,
74+
"publication" => publication,
75+
"subscribers_pids_table" => subscribers_pids_table,
76+
"subscribers_nodes_table" => subscribers_nodes_table
77+
} = args
7178

7279
subscription_manager_settings = Database.from_settings(args, "realtime_subscription_manager")
7380

@@ -85,31 +92,35 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
8592
check_region_interval = Map.get(args, :check_region_interval, rebalance_check_interval_in_ms())
8693
send_region_check_message(check_region_interval)
8794

88-
state = %State{
89-
id: id,
90-
conn: conn,
91-
publication: publication,
92-
subscribers_tid: subscribers_tid,
93-
oids: oids,
94-
delete_queue: %{
95-
ref: check_delete_queue(),
96-
queue: :queue.new()
97-
},
98-
no_users_ref: check_no_users(),
99-
check_region_interval: check_region_interval
100-
}
95+
state =
96+
%State{
97+
id: id,
98+
conn: conn,
99+
publication: publication,
100+
subscribers_pids_table: subscribers_pids_table,
101+
subscribers_nodes_table: subscribers_nodes_table,
102+
oids: oids,
103+
delete_queue: %{
104+
ref: check_delete_queue(),
105+
queue: :queue.new()
106+
},
107+
no_users_ref: check_no_users(),
108+
check_region_interval: check_region_interval
109+
}
101110

102111
send(self(), :check_oids)
103112
{:noreply, state}
104113
end
105114

106115
@impl true
107116
def handle_info({:subscribed, {pid, id}}, state) do
108-
case :ets.match(state.subscribers_tid, {pid, id, :"$1", :_}) do
109-
[] -> :ets.insert(state.subscribers_tid, {pid, id, Process.monitor(pid), node(pid)})
117+
case :ets.match(state.subscribers_pids_table, {pid, id, :"$1", :_}) do
118+
[] -> :ets.insert(state.subscribers_pids_table, {pid, id, Process.monitor(pid), node(pid)})
110119
_ -> :ok
111120
end
112121

122+
:ets.insert(state.subscribers_nodes_table, {UUID.string_to_binary!(id), node(pid)})
123+
113124
{:noreply, %{state | no_users_ts: nil}}
114125
end
115126

@@ -132,7 +143,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
132143
Process.demonitor(ref, [:flush])
133144
send(pid, :postgres_subscribe)
134145
end
135-
|> :ets.foldl([], state.subscribers_tid)
146+
|> :ets.foldl([], state.subscribers_pids_table)
136147

137148
new_oids
138149
end
@@ -142,19 +153,25 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
142153

143154
def handle_info(
144155
{:DOWN, _ref, :process, pid, _reason},
145-
%State{subscribers_tid: tid, delete_queue: %{queue: q}} = state
156+
%State{
157+
subscribers_pids_table: subscribers_pids_table,
158+
subscribers_nodes_table: subscribers_nodes_table,
159+
delete_queue: %{queue: q}
160+
} = state
146161
) do
147162
q1 =
148-
case :ets.take(tid, pid) do
163+
case :ets.take(subscribers_pids_table, pid) do
149164
[] ->
150165
q
151166

152167
values ->
153168
for {_pid, id, _ref, _node} <- values, reduce: q do
154169
acc ->
155-
id
156-
|> UUID.string_to_binary!()
157-
|> :queue.in(acc)
170+
bin_id = UUID.string_to_binary!(id)
171+
172+
:ets.delete(subscribers_nodes_table, bin_id)
173+
174+
:queue.in(bin_id, acc)
158175
end
159176
end
160177

@@ -187,7 +204,7 @@ defmodule Extensions.PostgresCdcRls.SubscriptionManager do
187204
{:noreply, %{state | delete_queue: %{ref: ref, queue: q1}}}
188205
end
189206

190-
def handle_info(:check_no_users, %{subscribers_tid: tid, no_users_ts: ts} = state) do
207+
def handle_info(:check_no_users, %{subscribers_pids_table: tid, no_users_ts: ts} = state) do
191208
Helpers.cancel_timer(state.no_users_ref)
192209

193210
ts_new =

lib/extensions/postgres_cdc_rls/subscriptions_checker.ex

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
1717

1818
defmodule State do
1919
@moduledoc false
20-
defstruct [:id, :conn, :check_active_pids, :subscribers_tid, :delete_queue]
20+
defstruct [:id, :conn, :check_active_pids, :subscribers_pids_table, :subscribers_nodes_table, :delete_queue]
2121

2222
@type t :: %__MODULE__{
2323
id: String.t(),
2424
conn: Postgrex.conn(),
2525
check_active_pids: reference(),
26-
subscribers_tid: :ets.tid(),
26+
subscribers_pids_table: :ets.tid(),
27+
subscribers_nodes_table: :ets.tid(),
2728
delete_queue: %{
2829
ref: reference(),
2930
queue: :queue.queue()
@@ -47,7 +48,11 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
4748

4849
@impl true
4950
def handle_continue({:connect, args}, _) do
50-
%{"id" => id, "subscribers_tid" => subscribers_tid} = args
51+
%{
52+
"id" => id,
53+
"subscribers_pids_table" => subscribers_pids_table,
54+
"subscribers_nodes_table" => subscribers_nodes_table
55+
} = args
5156

5257
realtime_subscription_checker_settings =
5358
Database.from_settings(args, "realtime_subscription_checker")
@@ -58,7 +63,8 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
5863
id: id,
5964
conn: conn,
6065
check_active_pids: check_active_pids(),
61-
subscribers_tid: subscribers_tid,
66+
subscribers_pids_table: subscribers_pids_table,
67+
subscribers_nodes_table: subscribers_nodes_table,
6268
delete_queue: %{
6369
ref: nil,
6470
queue: :queue.new()
@@ -69,18 +75,14 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
6975
end
7076

7177
@impl true
72-
def handle_info(
73-
:check_active_pids,
74-
%State{check_active_pids: ref, subscribers_tid: tid, delete_queue: delete_queue, id: id} =
75-
state
76-
) do
78+
def handle_info(:check_active_pids, %State{check_active_pids: ref, delete_queue: delete_queue, id: id} = state) do
7779
Helpers.cancel_timer(ref)
7880

7981
ids =
80-
tid
82+
state.subscribers_pids_table
8183
|> subscribers_by_node()
8284
|> not_alive_pids_dist()
83-
|> pop_not_alive_pids(tid, id)
85+
|> pop_not_alive_pids(state.subscribers_pids_table, state.subscribers_nodes_table, id)
8486

8587
new_delete_queue =
8688
if length(ids) > 0 do
@@ -128,10 +130,10 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
128130

129131
## Internal functions
130132

131-
@spec pop_not_alive_pids([pid()], :ets.tid(), binary()) :: [Ecto.UUID.t()]
132-
def pop_not_alive_pids(pids, tid, tenant_id) do
133+
@spec pop_not_alive_pids([pid()], :ets.tid(), :ets.tid(), binary()) :: [Ecto.UUID.t()]
134+
def pop_not_alive_pids(pids, subscribers_pids_table, subscribers_nodes_table, tenant_id) do
133135
Enum.reduce(pids, [], fn pid, acc ->
134-
case :ets.lookup(tid, pid) do
136+
case :ets.lookup(subscribers_pids_table, pid) do
135137
[] ->
136138
Telemetry.execute(
137139
[:realtime, :subscriptions_checker, :pid_not_found],
@@ -149,8 +151,11 @@ defmodule Extensions.PostgresCdcRls.SubscriptionsChecker do
149151
%{tenant_id: tenant_id}
150152
)
151153

152-
:ets.delete(tid, pid)
153-
UUID.string_to_binary!(postgres_id)
154+
:ets.delete(subscribers_pids_table, pid)
155+
bin_id = UUID.string_to_binary!(postgres_id)
156+
157+
:ets.delete(subscribers_nodes_table, bin_id)
158+
bin_id
154159
end ++ acc
155160
end
156161
end)

lib/extensions/postgres_cdc_rls/worker_supervisor.ex

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,19 @@ defmodule Extensions.PostgresCdcRls.WorkerSupervisor do
1919
Logger.metadata(external_id: tenant, project: tenant)
2020
unless Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)
2121

22-
tid_args = Map.merge(args, %{"subscribers_tid" => :ets.new(__MODULE__, [:public, :bag])})
22+
subscribers_pids_table = :ets.new(__MODULE__, [:public, :bag])
23+
subscribers_nodes_table = :ets.new(__MODULE__, [:public, :set])
24+
25+
tid_args =
26+
Map.merge(args, %{
27+
"subscribers_pids_table" => subscribers_pids_table,
28+
"subscribers_nodes_table" => subscribers_nodes_table
29+
})
2330

2431
children = [
2532
%{
2633
id: ReplicationPoller,
27-
start: {ReplicationPoller, :start_link, [args]},
34+
start: {ReplicationPoller, :start_link, [tid_args]},
2835
restart: :transient
2936
},
3037
%{

lib/realtime/gen_rpc.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,36 @@ defmodule Realtime.GenRpc do
2626
:ok
2727
end
2828

29+
@doc """
30+
Fire and forget apply(mod, func, args) on one node
31+
32+
Options:
33+
34+
- `:key` - Optional key to consistently select the same gen_rpc client to guarantee some message order between nodes
35+
"""
36+
@spec cast(node, module, atom, list(any), keyword()) :: :ok
37+
def cast(node, mod, func, args, opts \\ [])
38+
39+
# Local
40+
def cast(node, mod, func, args, _opts) when node == node() do
41+
:erpc.cast(node, mod, func, args)
42+
:ok
43+
end
44+
45+
def cast(node, mod, func, args, opts)
46+
when is_atom(node) and is_atom(mod) and is_atom(func) and is_list(args) and is_list(opts) do
47+
key = Keyword.get(opts, :key, nil)
48+
49+
# Ensure this node is part of the connected nodes
50+
if node in Node.list() do
51+
node_key = rpc_node(node, key)
52+
53+
:gen_rpc.cast(node_key, mod, func, args)
54+
end
55+
56+
:ok
57+
end
58+
2959
@doc """
3060
Fire and forget apply(mod, func, args) on all nodes
3161

0 commit comments

Comments
 (0)