Skip to content

Commit b05450b

Browse files
authored
add Message.count_by_status (#374)
1 parent 95e36bb commit b05450b

27 files changed

+637
-839
lines changed

db/migrate/create_outboxer_counters.rb

Lines changed: 0 additions & 25 deletions
This file was deleted.

db/migrate/create_outboxer_message_counts.rb

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1-
class CreateOutboxerMessageCounts < ActiveRecord::Migration[7.1]
1+
class CreateOutboxerMessageCounts < ActiveRecord::Migration[6.1]
22
def up
33
create_table :outboxer_message_counts do |t|
4-
t.string :status, limit: 255, null: false
5-
t.integer :partition, null: false
6-
t.bigint :value, null: false
7-
t.timestamps
4+
t.string :hostname, limit: 255, null: false
5+
t.integer :process_id, null: false
6+
t.integer :thread_id, null: false
7+
8+
t.integer :queued, null: false
9+
t.integer :publishing, null: false
10+
t.integer :published, null: false
11+
t.integer :failed, null: false
12+
13+
t.timestamps null: false
814
end
915

10-
add_index :outboxer_message_counts, [:status, :partition],
11-
unique: true, name: :idx_outboxer_counts_status_partition
16+
add_index :outboxer_message_counts, [:hostname, :process_id, :thread_id],
17+
unique: true, name: "idx_outboxer_message_counts_identity"
1218
end
1319

1420
def down

db/migrate/create_outboxer_message_totals.rb

Lines changed: 0 additions & 17 deletions
This file was deleted.

generators/install_generator.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ def copy_migrations
2121
"db/migrate/create_outboxer_message_counts.rb",
2222
"db/migrate/create_outboxer_message_counts.rb")
2323

24-
migration_template(
25-
"db/migrate/create_outboxer_message_totals.rb",
26-
"db/migrate/create_outboxer_message_totals.rb")
27-
2824
migration_template(
2925
"db/migrate/create_outboxer_exceptions.rb",
3026
"db/migrate/create_outboxer_exceptions.rb")

lib/outboxer.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@
88
require_relative "outboxer/models/frame"
99
require_relative "outboxer/models/exception"
1010
require_relative "outboxer/models/message"
11-
require_relative "outboxer/models/message_count"
12-
require_relative "outboxer/models/message_total"
13-
require_relative "outboxer/models/counter"
11+
require_relative "outboxer/models/message/count"
1412

1513
require_relative "outboxer/models/publisher"
1614
require_relative "outboxer/models/signal"
1715

1816
require_relative "outboxer/database"
19-
require_relative "outboxer/counter"
2017
require_relative "outboxer/message"
2118
require_relative "outboxer/publisher"
2219

lib/outboxer/counter.rb

Lines changed: 0 additions & 86 deletions
This file was deleted.

lib/outboxer/database.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,9 @@ def truncate(logger: nil)
8383
if connection.adapter_name.downcase.include?("postgres")
8484
connection.execute(<<~SQL)
8585
TRUNCATE TABLE
86-
outboxer_message_counts,
87-
outboxer_message_totals,
8886
outboxer_frames,
8987
outboxer_exceptions,
88+
outboxer_message_counts,
9089
outboxer_messages,
9190
outboxer_signals,
9291
outboxer_publishers
@@ -97,10 +96,9 @@ def truncate(logger: nil)
9796

9897
begin
9998
connection.execute("SET FOREIGN_KEY_CHECKS = 0;")
100-
connection.execute("TRUNCATE TABLE outboxer_message_counts;")
101-
connection.execute("TRUNCATE TABLE outboxer_message_totals;")
10299
connection.execute("TRUNCATE TABLE outboxer_frames;")
103100
connection.execute("TRUNCATE TABLE outboxer_exceptions;")
101+
connection.execute("TRUNCATE TABLE outboxer_message_counts;")
104102
connection.execute("TRUNCATE TABLE outboxer_messages;")
105103
connection.execute("TRUNCATE TABLE outboxer_signals;")
106104
connection.execute("TRUNCATE TABLE outboxer_publishers;")

lib/outboxer/loader.rb

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ module Loader
1313
LOAD_DEFAULTS = {
1414
batch_size: 1_000,
1515
concurrency: 5,
16-
size: 1_000_000,
16+
size: 1,
1717
tick_interval: 1
1818
}
1919

@@ -35,7 +35,7 @@ def parse_cli_options(argv)
3535
options[:concurrency] = v
3636
end
3737

38-
opts.on("--size SIZE", Integer, "Number of messages to load (default: 1M)") do |v|
38+
opts.on("--size SIZE", Integer, "Number of messages to load (default: 1)") do |v|
3939
options[:size] = v
4040
end
4141

@@ -65,24 +65,28 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],
6565
size: LOAD_DEFAULTS[:size],
6666
tick_interval: LOAD_DEFAULTS[:tick_interval],
6767
logger: Outboxer::Logger.new($stdout))
68-
status = :loading
69-
reader, _writer = trap_signals
68+
Thread.main[:status] = :loading
69+
70+
Signal.trap("INT") { Thread.main[:status] = :terminating }
71+
Signal.trap("TERM") { Thread.main[:status] = :terminating }
72+
7073
queue = Queue.new
7174
threads = spawn_workers(concurrency, queue, logger)
7275

7376
enqueued = 0
7477
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
7578

7679
while enqueued < size
77-
status = read_status(reader) || status
78-
79-
case status
80+
case Thread.main[:status]
8081
when :terminating
8182
break
8283
when :stopped
8384
sleep tick_interval
8485
when :loading
85-
messageables = Array.new(batch_size) do
86+
remaining = size - enqueued
87+
count = [batch_size, remaining].min
88+
89+
messageables = Array.new(count) do
8690
OpenStruct.new(class: OpenStruct.new(name: "Event"), id: SecureRandom.hex(3))
8791
end
8892

@@ -91,8 +95,6 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],
9195

9296
# logger.info "[main] enqueued #{enqueued}/#{size}" if (enqueued % (batch_size * 2)).zero?
9397
end
94-
95-
# display_metrics(logger)
9698
end
9799

98100
queue.close
@@ -108,41 +110,16 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size],
108110
logger.info "[main] done"
109111
end
110112

111-
def trap_signals
112-
reader, writer = IO.pipe
113-
114-
%w[INT TERM TSTP CONT].each do |signal|
115-
Signal.trap(signal) { writer.puts(signal) }
116-
end
117-
118-
[reader, writer]
119-
end
120-
121-
def read_status(reader)
122-
line = reader.ready? ? reader.gets&.strip : nil
123-
124-
case line
125-
when "INT", "TERM" then :terminating
126-
when "TSTP" then :stopped
127-
when "CONT" then :loading
128-
end
129-
end
130-
131-
def spawn_workers(concurrency, queue, logger)
132-
Array.new(concurrency) do |index|
113+
def spawn_workers(concurrency, queue, _logger)
114+
Array.new(concurrency) do |_index|
133115
Thread.new do
134-
begin
135-
while (messageables = queue.pop)
136-
messageables.each do |messageable|
137-
Outboxer::Message.queue(messageable: messageable)
138-
rescue StandardError => error
139-
logger.error "[thread-#{index}] #{error.class}: #{error.message}"
140-
141-
sleep 1
142-
end
116+
while Thread.main[:status] != :terminating
117+
messageables = queue.pop
118+
break if messageables.nil?
119+
120+
messageables.each do |messageable|
121+
Outboxer::Message.queue(messageable: messageable)
143122
end
144-
rescue ClosedQueueError
145-
# Queue closed and empty — exit gracefully
146123
end
147124
end
148125
end

0 commit comments

Comments
 (0)