From eead53d85281100855380d025c35b0c2962dc49a Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Mon, 27 Oct 2025 18:51:42 +1100 Subject: [PATCH 1/2] add counter rollup --- db/migrate/create_outboxer_counters.rb | 6 +- lib/outboxer.rb | 1 + lib/outboxer/counter.rb | 120 ++++++++++++ spec/lib/outboxer/counter_spec.rb | 254 +++++++++++++++++++++++++ 4 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 lib/outboxer/counter.rb create mode 100644 spec/lib/outboxer/counter_spec.rb diff --git a/db/migrate/create_outboxer_counters.rb b/db/migrate/create_outboxer_counters.rb index 37200725..618d6d50 100644 --- a/db/migrate/create_outboxer_counters.rb +++ b/db/migrate/create_outboxer_counters.rb @@ -1,9 +1,9 @@ class CreateOutboxerCounters < ActiveRecord::Migration[6.1] def up create_table :outboxer_counters do |t| - t.string :hostname, limit: 255 - t.integer :process_id - t.integer :thread_id + t.string :hostname, limit: 255, null: false + t.integer :process_id, null: false + t.integer :thread_id, null: false t.integer :publisher_id diff --git a/lib/outboxer.rb b/lib/outboxer.rb index 48d14579..390da366 100644 --- a/lib/outboxer.rb +++ b/lib/outboxer.rb @@ -16,6 +16,7 @@ require_relative "outboxer/models/signal" require_relative "outboxer/database" +require_relative "outboxer/counter" require_relative "outboxer/message" require_relative "outboxer/publisher" diff --git a/lib/outboxer/counter.rb b/lib/outboxer/counter.rb new file mode 100644 index 00000000..5d17b835 --- /dev/null +++ b/lib/outboxer/counter.rb @@ -0,0 +1,120 @@ +module Outboxer + module Counter + HISTORIC_HOSTNAME = "historic" + HISTORIC_PROCESS_ID = 0 + HISTORIC_THREAD_ID = 0 + + # Returns historic counter details + # + # @return [Hash] a hash containing the historic counter: + # { + # queued_count: Integer, + # publishing_count: Integer, + # published_count: Integer, + # failed_count: Integer + # } + def self.historic + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + counter = Models::Counter.lock("FOR UPDATE").find_by!( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID) + + { + queued_count: counter.queued_count, + publishing_count: counter.publishing_count, + published_count: counter.published_count, + failed_count: counter.failed_count + } + end + end + rescue ActiveRecord::RecordNotFound + { + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + } + end + + # Rolls up thread_counters for the given publisher_id into the historic_counter, + # then destroys rolled-up thread_counters. + # + # @param publisher_id [Integer] publisher to roll up + # @param time [Time] timestamp context for updated_at + # @return [Hash] new historic_totals after rollup + def self.rollup(publisher_id:, time:) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + Models::Counter.insert_or_increment_by( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID, + time: time + ) + + historic_counter = Models::Counter.lock("FOR UPDATE").find_by!( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID + ) + + thread_counters = Models::Counter.lock("FOR UPDATE") + .where("publisher_id = ? OR publisher_id IS NULL", publisher_id) + .where.not( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID) + + thread_counter_totals = Models::Counter + .where(id: thread_counters.select(:id)) + .select( + "COALESCE(SUM(queued_count), 0) AS queued_count", + "COALESCE(SUM(publishing_count), 0) AS publishing_count", + "COALESCE(SUM(published_count), 0) AS published_count", + "COALESCE(SUM(failed_count), 0) AS failed_count" + ).take.attributes.symbolize_keys.slice( + :queued_count, :publishing_count, :published_count, :failed_count) + + historic_counter.update!( + queued_count: historic_counter.queued_count + thread_counter_totals[:queued_count], + publishing_count: historic_counter.publishing_count + + thread_counter_totals[:publishing_count], + published_count: historic_counter.published_count + + thread_counter_totals[:published_count], + failed_count: historic_counter.failed_count + thread_counter_totals[:failed_count], + updated_at: time + ) + + thread_counters.destroy_all + + { + queued_count: historic_counter.queued_count, + publishing_count: historic_counter.publishing_count, + published_count: historic_counter.published_count, + failed_count: historic_counter.failed_count + } + end + end + end + + # Returns total counts across the historic_counter and all thread_counters. + # + # @return [Hash] total counts across all rows + def self.total + ActiveRecord::Base.connection_pool.with_connection do + result = Models::Counter.select( + "COALESCE(SUM(queued_count), 0) AS queued_count", + "COALESCE(SUM(publishing_count), 0) AS publishing_count", + "COALESCE(SUM(published_count), 0) AS published_count", + "COALESCE(SUM(failed_count), 0) AS failed_count" + ).take + + result.attributes.symbolize_keys.slice( + :queued_count, :publishing_count, :published_count, :failed_count + ) + end + end + end +end diff --git a/spec/lib/outboxer/counter_spec.rb b/spec/lib/outboxer/counter_spec.rb new file mode 100644 index 00000000..e7224189 --- /dev/null +++ b/spec/lib/outboxer/counter_spec.rb @@ -0,0 +1,254 @@ +# frozen_string_literal: true + +require "rails_helper" + +module Outboxer + RSpec.describe Counter do + let(:time_now) { Time.utc(2025, 1, 1, 0, 0, 0) } + let(:publisher_a) { 100 } + let(:publisher_b) { 200 } + + before { Models::Counter.delete_all } + + describe ".historic" do + context "when historic counter exists" do + before do + Models::Counter.create!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID, + queued_count: 10, publishing_count: 20, + published_count: 30, failed_count: 40, + created_at: time_now, updated_at: time_now) + end + + it "returns the current historic counts" do + result = Counter.historic + + expect(result).to eq( + queued_count: 10, + publishing_count: 20, + published_count: 30, + failed_count: 40 + ) + end + end + + context "when no historic counter exists" do + it "returns all zero counts" do + result = Counter.historic + + expect(result).to eq( + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + ) + end + end + end + + describe ".rollup" do + context "when historic counter does not exist" do + let!(:thread_1) do + Models::Counter.create!( + hostname: "worker1.test.local", process_id: 111, thread_id: 21_001, + publisher_id: publisher_a, + queued_count: 1, publishing_count: 2, + published_count: 3, failed_count: 4, + created_at: time_now, updated_at: time_now) + end + + let!(:thread_2) do + Models::Counter.create!( + hostname: "worker1.test.local", process_id: 111, thread_id: 21_002, + publisher_id: publisher_a, + queued_count: 10, publishing_count: 20, + published_count: 30, failed_count: 40, + created_at: time_now, updated_at: time_now) + end + + it "creates the historic counter and rolls up all threads" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 11, + publishing_count: 22, + published_count: 33, + failed_count: 44 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(11) + expect(historic.publishing_count).to eq(22) + expect(historic.published_count).to eq(33) + expect(historic.failed_count).to eq(44) + + expect(Models::Counter.where.not(hostname: Counter::HISTORIC_HOSTNAME)).to be_empty + end + end + + context "when historic counter already exists" do + let!(:historic) do + Models::Counter.create!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID, + queued_count: 100, publishing_count: 200, + published_count: 300, failed_count: 400, + created_at: time_now, updated_at: time_now) + end + + let!(:thread) do + Models::Counter.create!( + hostname: "worker2.test.local", process_id: 222, thread_id: 22_001, + publisher_id: publisher_a, + queued_count: 1, publishing_count: 1, + published_count: 1, failed_count: 1, + created_at: time_now, updated_at: time_now) + end + + it "increments existing historic counters" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 101, + publishing_count: 201, + published_count: 301, + failed_count: 401 + ) + + historic.reload + + expect(historic.queued_count).to eq(101) + expect(historic.publishing_count).to eq(201) + expect(historic.published_count).to eq(301) + expect(historic.failed_count).to eq(401) + end + end + + context "when thread counters belong to a different publisher" do + let!(:thread) do + Models::Counter.create!( + hostname: "worker3.test.local", process_id: 333, thread_id: 23_001, + publisher_id: publisher_b, + queued_count: 5, publishing_count: 6, + published_count: 7, failed_count: 8, + created_at: time_now, updated_at: time_now) + end + + it "creates the historic counter but does not include other publishers" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(0) + expect(historic.publishing_count).to eq(0) + expect(historic.published_count).to eq(0) + expect(historic.failed_count).to eq(0) + + expect(Models::Counter.where(publisher_id: publisher_b).count).to eq(1) + end + end + + context "when thread counters have nil publisher_id" do + let!(:thread) do + Models::Counter.create!( + hostname: "worker4.test.local", process_id: 444, thread_id: 24_001, + publisher_id: nil, + queued_count: 2, publishing_count: 3, + published_count: 4, failed_count: 5, + created_at: time_now, updated_at: time_now) + end + + it "includes nil publisher rows in the rollup" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 2, + publishing_count: 3, + published_count: 4, + failed_count: 5 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(2) + expect(historic.publishing_count).to eq(3) + expect(historic.published_count).to eq(4) + expect(historic.failed_count).to eq(5) + end + end + + context "when no matching counters exist" do + it "creates a historic counter with all zero counts" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(0) + expect(historic.publishing_count).to eq(0) + expect(historic.published_count).to eq(0) + expect(historic.failed_count).to eq(0) + end + end + end + + describe ".total" do + before do + Models::Counter.create!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID, + queued_count: 10, publishing_count: 20, + published_count: 30, failed_count: 40, + created_at: time_now, updated_at: time_now) + + Models::Counter.create!( + hostname: "worker5.test.local", process_id: 555, thread_id: 25_001, + publisher_id: publisher_a, + queued_count: 5, publishing_count: 6, + published_count: 7, failed_count: 8, + created_at: time_now, updated_at: time_now) + end + + it "returns total counts across all counters" do + totals = Counter.total + + expect(totals).to eq( + queued_count: 15, + publishing_count: 26, + published_count: 37, + failed_count: 48 + ) + end + end + end +end From f6d1ea4bdcc1d2975071f3ca9fe3c0d1fecc4374 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Mon, 27 Oct 2025 18:56:59 +1100 Subject: [PATCH 2/2] commit changes --- lib/outboxer/counter.rb | 34 -------------------------- spec/lib/outboxer/counter_spec.rb | 40 +------------------------------ 2 files changed, 1 insertion(+), 73 deletions(-) diff --git a/lib/outboxer/counter.rb b/lib/outboxer/counter.rb index 5d17b835..0f4c9e46 100644 --- a/lib/outboxer/counter.rb +++ b/lib/outboxer/counter.rb @@ -4,40 +4,6 @@ module Counter HISTORIC_PROCESS_ID = 0 HISTORIC_THREAD_ID = 0 - # Returns historic counter details - # - # @return [Hash] a hash containing the historic counter: - # { - # queued_count: Integer, - # publishing_count: Integer, - # published_count: Integer, - # failed_count: Integer - # } - def self.historic - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - counter = Models::Counter.lock("FOR UPDATE").find_by!( - hostname: HISTORIC_HOSTNAME, - process_id: HISTORIC_PROCESS_ID, - thread_id: HISTORIC_THREAD_ID) - - { - queued_count: counter.queued_count, - publishing_count: counter.publishing_count, - published_count: counter.published_count, - failed_count: counter.failed_count - } - end - end - rescue ActiveRecord::RecordNotFound - { - queued_count: 0, - publishing_count: 0, - published_count: 0, - failed_count: 0 - } - end - # Rolls up thread_counters for the given publisher_id into the historic_counter, # then destroys rolled-up thread_counters. # diff --git a/spec/lib/outboxer/counter_spec.rb b/spec/lib/outboxer/counter_spec.rb index e7224189..6217b177 100644 --- a/spec/lib/outboxer/counter_spec.rb +++ b/spec/lib/outboxer/counter_spec.rb @@ -10,44 +10,6 @@ module Outboxer before { Models::Counter.delete_all } - describe ".historic" do - context "when historic counter exists" do - before do - Models::Counter.create!( - hostname: Counter::HISTORIC_HOSTNAME, - process_id: Counter::HISTORIC_PROCESS_ID, - thread_id: Counter::HISTORIC_THREAD_ID, - queued_count: 10, publishing_count: 20, - published_count: 30, failed_count: 40, - created_at: time_now, updated_at: time_now) - end - - it "returns the current historic counts" do - result = Counter.historic - - expect(result).to eq( - queued_count: 10, - publishing_count: 20, - published_count: 30, - failed_count: 40 - ) - end - end - - context "when no historic counter exists" do - it "returns all zero counts" do - result = Counter.historic - - expect(result).to eq( - queued_count: 0, - publishing_count: 0, - published_count: 0, - failed_count: 0 - ) - end - end - end - describe ".rollup" do context "when historic counter does not exist" do let!(:thread_1) do @@ -68,7 +30,7 @@ module Outboxer created_at: time_now, updated_at: time_now) end - it "creates the historic counter and rolls up all threads" do + it "creates the historic counter and rolls up all the thread counters" do result = Counter.rollup(publisher_id: publisher_a, time: time_now) expect(result).to eq(