Skip to content

Outboxer publisher

Adam Mikulasev edited this page Oct 15, 2025 · 2 revisions
# frozen_string_literal: true

require "outboxer"
require "outboxer/logger"

module Outboxer
  module Publisher
    module_function

    module Status
      PUBLISHING   = :publishing
      STOPPED      = :stopped
      TERMINATING  = :terminating
    end

    def publish_message(concurrency: Integer(ENV.fetch("OUTBOXER_CONCURRENCY", "4")),
                        logger: Outboxer::Logger.new($stdout), &block)
      raise ArgumentError, "publish_message requires a block" unless block

      status = Status::PUBLISHING
      self_read, self_write = IO.pipe

      Signal.trap("TSTP") { self_write.write_nonblock("TSTP\n") rescue nil }
      Signal.trap("CONT") { self_write.write_nonblock("CONT\n") rescue nil }
      Signal.trap("TERM") { self_write.write_nonblock("TERM\n") rescue nil }
      Signal.trap("INT")  { self_write.write_nonblock("INT\n")  rescue nil }

      logger.info("Starting publisher with concurrency=#{concurrency}")

      threads = Array.new(concurrency) do |i|
        Thread.new do
          Thread.current.name = "outboxer.worker.#{i + 1}"

          while status != Status::TERMINATING
            case status
            when Status::STOPPED
              sleep(1)
            when Status::PUBLISHING
              begin
                message = Outboxer::Message.publish { |message| block.call(message) }

                if message.nil?
                  sleep(1)
                end
              rescue StandardError
                sleep(1)
              end
            end
          end
        end
      end

      begin
        while (signal = self_read.gets&.strip)
          case signal
          when "TSTP"
            status = Status::STOPPED
          when "CONT"
            status = Status::PUBLISHING
          when "TERM", "INT"
            status = Status::TERMINATING
            break
          end
        end
      rescue IOError, EOFError
      end

      threads.each(&:join)
      status = Status::TERMINATING
      logger.info("Publisher stopped cleanly (status=#{status})")
    end
  end
end
Clone this wiki locally