Skip to content

Outboxer publisher

Adam Mikulasev edited this page Oct 15, 2025 · 2 revisions
# frozen_string_literal: true

require "outboxer"
require "outboxer/logger"

module Outboxer
  module Publisher
    module_function

    module Status
      PUBLISHING   = :publishing
      STOPPED      = :stopped
      TERMINATING  = :terminating
    end

    def publish_message(concurrency: Integer(ENV.fetch("OUTBOXER_CONCURRENCY", "4")),
                        logger: Outboxer::Logger.new($stdout), &block)
      raise ArgumentError, "publish_message requires a block" unless block

      status = Status::PUBLISHING
      self_read, self_write = IO.pipe

      Signal.trap("TSTP") { self_write.write_nonblock("TSTP\n") rescue nil }
      Signal.trap("CONT") { self_write.write_nonblock("CONT\n") rescue nil }
      Signal.trap("TERM") { self_write.write_nonblock("TERM\n") rescue nil }
      Signal.trap("INT")  { self_write.write_nonblock("INT\n")  rescue nil }

      logger.info("Starting publisher with concurrency=#{concurrency}")

      threads = Array.new(concurrency) do |i|
        Thread.new do
          Thread.current.name = "outboxer.worker.#{i + 1}"

          while status != Status::TERMINATING
            case status
            when Status::STOPPED
              sleep(0.1)
            when Status::PUBLISHING
              begin
                message_published = Outboxer::Message.publish do |message|
                  block.call(message)
                  sleep(rand(0.05..0.15))
                end
                sleep(0.1) if message_published.nil?
              rescue StandardError
                sleep(0.5)
              end
            end
          end
        end
      end

      begin
        while (signal = self_read.gets&.strip)
          case signal
          when "TSTP"
            status = Status::STOPPED
          when "CONT"
            status = Status::PUBLISHING
          when "TERM", "INT"
            status = Status::TERMINATING
            break
          end
        end
      rescue IOError, EOFError
      end

      threads.each(&:join)
      status = Status::TERMINATING
      logger.info("Publisher stopped cleanly (status=#{status})")
    end
  end
end

Clone this wiki locally