Skip to content

Commit a8a39c2

Browse files
committed
fix: provide microsecond granularity by using binary pgoutput mode
1 parent b8dd5f3 commit a8a39c2

File tree

4 files changed

+39
-33
lines changed

4 files changed

+39
-33
lines changed

lib/realtime/adapters/postgres/decoder.ex

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -218,19 +218,13 @@ defmodule Realtime.Adapters.Postgres.Decoder do
218218
defp decode_message_impl(<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
219219
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
220220

221-
%Insert{
222-
relation_id: relation_id,
223-
tuple_data: decoded_tuple_data
224-
}
221+
%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
225222
end
226223

227224
defp decode_message_impl(<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
228225
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
229226

230-
%Update{
231-
relation_id: relation_id,
232-
tuple_data: decoded_tuple_data
233-
}
227+
%Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
234228
end
235229

236230
defp decode_message_impl(
@@ -242,10 +236,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do
242236

243237
{<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns)
244238

245-
base_update_msg = %Update{
246-
relation_id: relation_id,
247-
tuple_data: decoded_tuple_data
248-
}
239+
base_update_msg = %Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
249240

250241
case key_or_old do
251242
"K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data)
@@ -259,9 +250,7 @@ defmodule Realtime.Adapters.Postgres.Decoder do
259250
when key_or_old == "K" or key_or_old == "O" do
260251
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)
261252

262-
base_delete_msg = %Delete{
263-
relation_id: relation_id
264-
}
253+
base_delete_msg = %Delete{relation_id: relation_id}
265254

266255
case key_or_old do
267256
"K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data)
@@ -313,19 +302,39 @@ defmodule Realtime.Adapters.Postgres.Decoder do
313302
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator),
314303
do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator])
315304

305+
@start_date "2000-01-01T00:00:00Z"
316306
defp decode_tuple_data(
317-
<<"t", column_length::integer-32, rest::binary>>,
307+
<<"b", column_length::integer-32, rest::binary>>,
318308
columns_remaining,
319309
accumulator
320-
),
321-
do:
322-
decode_tuple_data(
323-
:erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)}),
324-
columns_remaining - 1,
325-
[
326-
:erlang.binary_part(rest, {0, column_length}) | accumulator
327-
]
328-
)
310+
) do
311+
data = :erlang.binary_part(rest, {0, column_length})
312+
remainder = :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)})
313+
314+
data =
315+
case data do
316+
<<1>> ->
317+
true
318+
319+
<<0>> ->
320+
false
321+
322+
<<1, binary::binary-size(column_length - 1)>> ->
323+
binary
324+
325+
<<microseconds::signed-big-64>> ->
326+
NaiveDateTime.from_iso8601!(@start_date)
327+
|> NaiveDateTime.add(microseconds, :microsecond)
328+
329+
<<uuid_binary::binary-16>> ->
330+
UUID.binary_to_string!(uuid_binary)
331+
332+
data when is_binary(data) ->
333+
data
334+
end
335+
336+
decode_tuple_data(remainder, columns_remaining - 1, [data | accumulator])
337+
end
329338

330339
defp decode_columns(binary, accumulator \\ [])
331340
defp decode_columns(<<>>, accumulator), do: Enum.reverse(accumulator)
@@ -345,7 +354,6 @@ defmodule Realtime.Adapters.Postgres.Decoder do
345354
name: name,
346355
flags: decoded_flags,
347356
type: OidDatabase.name_for_type_id(data_type_id),
348-
# type: data_type_id,
349357
type_modifier: type_modifier
350358
}
351359
| accumulator

lib/realtime/api/message.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule Realtime.Api.Message do
1717
field(:event, :string)
1818
field(:private, :boolean)
1919

20-
timestamps()
20+
timestamps(type: :naive_datetime_usec)
2121
end
2222

2323
def changeset(message, attrs) do

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ defmodule Realtime.PromEx.Plugins.Tenant do
245245
unit: :second,
246246
description: "Latency of database inserted_at until reaches server to be broadcasted",
247247
tags: [:tenant],
248-
reporter_options: [buckets: [1, 2, 5]]
248+
reporter_options: [buckets: [10, 250, 5000]]
249249
)
250250
]
251251
)

lib/realtime/tenants/replication_connection.ex

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
5757
publication_name: nil,
5858
replication_slot_name: nil,
5959
output_plugin: "pgoutput",
60-
proto_version: 1,
60+
proto_version: 3,
6161
relations: %{},
6262
buffer: [],
6363
monitored_pid: nil,
@@ -236,7 +236,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
236236
)
237237

238238
query =
239-
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
239+
"START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}', binary 'true')"
240240

241241
{:stream, query, [], %{state | step: :streaming}}
242242
end
@@ -318,8 +318,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
318318
payload: Map.put_new(payload, "id", id)
319319
},
320320
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
321-
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
322-
latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at)
321+
latency_inserted_at = NaiveDateTime.utc_now(:microsecond) |> NaiveDateTime.diff(inserted_at)
323322

324323
Telemetry.execute(
325324
[:realtime, :tenants, :broadcast_from_database],
@@ -377,7 +376,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
377376
|> Map.new(fn
378377
{nil, %{name: name}} -> {name, nil}
379378
{value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
380-
{value, %{name: name, type: "bool"}} -> {name, value == "t"}
381379
{value, %{name: name}} -> {name, value}
382380
end)
383381
end

0 commit comments

Comments
 (0)