Skip to content

Conversation

@bedrock-adam
Copy link
Contributor

@bedrock-adam bedrock-adam commented Nov 10, 2025

Record messages updated to separate per thread model, to remove sweeper entirely

Also improves .count query performance

@bedrock-adam bedrock-adam changed the title Refactor > find by update or create message counter Refactor > optimised publishing Nov 16, 2025
require_relative "outboxer/models/exception"
require_relative "outboxer/models/message"
require_relative "outboxer/models/message/count"
require_relative "outboxer/models/message/counter"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be a thread?

@bedrock-adam
Copy link
Contributor Author

def rollup_counts(time: Time)
  current_utc_time = time.now.utc

  ActiveRecord::Base.connection_pool.with_connection do
    ActiveRecord::Base.transaction do
      historic_hostname = Models::Message::Counter::HISTORIC_HOSTNAME
      historic_process_id = Models::Message::Counter::HISTORIC_PROCESS_ID
      historic_thread_id = Models::Message::Counter::HISTORIC_THREAD_ID

      # 1. Lock or create the historic counter row.
      historic_counter = Models::Message::Counter.lock.find_by(
        hostname: historic_hostname,
        process_id: historic_process_id,
        thread_id: historic_thread_id
      )

      if historic_counter.nil?
        historic_counter = Models::Message::Counter.create!(
          hostname: historic_hostname,
          process_id: historic_process_id,
          thread_id: historic_thread_id,
          queued_count: 0,
          publishing_count: 0,
          published_count: 0,
          failed_count: 0,
          created_at: current_utc_time,
          updated_at: current_utc_time
        )
      end

      # 2. Capture the exact thread_counter IDs to roll up (stable snapshot).
      thread_counter_ids = Models::Message::Counter
        .where.not(
          hostname: historic_hostname,
          process_id: historic_process_id,
          thread_id: historic_thread_id
        )
        .pluck(:id)

      # Nothing to do if no thread counters exist.
      if thread_counter_ids.empty?
        return {
          queued: historic_counter.queued_count,
          publishing: historic_counter.publishing_count,
          published: historic_counter.published_count,
          failed: historic_counter.failed_count
        }
      end

      # 3. Lock exactly the rows we captured.
      thread_counters = Models::Message::Counter
        .where(id: thread_counter_ids)
        .lock("FOR UPDATE")
        .to_a

      # 4. Compute the rollup totals.
      totals = {
        queued_count: 0,
        publishing_count: 0,
        published_count: 0,
        failed_count: 0
      }

      thread_counters.each do |counter|
        totals[:queued_count]      += counter.queued_count
        totals[:publishing_count]  += counter.publishing_count
        totals[:published_count]   += counter.published_count
        totals[:failed_count]      += counter.failed_count
      end

      # 5. Update the historic counter by adding the totals.
      historic_counter.update!(
        queued_count: historic_counter.queued_count + totals[:queued_count],
        publishing_count: historic_counter.publishing_count + totals[:publishing_count],
        published_count: historic_counter.published_count + totals[:published_count],
        failed_count: historic_counter.failed_count + totals[:failed_count],
        updated_at: current_utc_time
      )

      # 6. Delete exactly the rows we rolled up.
      Models::Message::Counter.where(id: thread_counter_ids).delete_all

      # 7. Return updated totals.
      {
        queued: historic_counter.queued_count,
        publishing: historic_counter.publishing_count,
        published: historic_counter.published_count,
        failed: historic_counter.failed_count
      }
    end
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants