-
Notifications
You must be signed in to change notification settings - Fork 1
Outboxer publisher
Adam Mikulasev edited this page Oct 15, 2025
·
2 revisions
# frozen_string_literal: true
require "outboxer"
require "outboxer/logger"
module Outboxer
module Publisher
module_function
module Status
PUBLISHING = :publishing
STOPPED = :stopped
TERMINATING = :terminating
end
def publish_message(concurrency: Integer(ENV.fetch("OUTBOXER_CONCURRENCY", "4")),
logger: Outboxer::Logger.new($stdout), &block)
raise ArgumentError, "publish_message requires a block" unless block
status = Status::PUBLISHING
self_read, self_write = IO.pipe
Signal.trap("TSTP") { self_write.write_nonblock("TSTP\n") rescue nil }
Signal.trap("CONT") { self_write.write_nonblock("CONT\n") rescue nil }
Signal.trap("TERM") { self_write.write_nonblock("TERM\n") rescue nil }
Signal.trap("INT") { self_write.write_nonblock("INT\n") rescue nil }
logger.info("Starting publisher with concurrency=#{concurrency}")
threads = Array.new(concurrency) do |i|
Thread.new do
Thread.current.name = "outboxer.worker.#{i + 1}"
while status != Status::TERMINATING
case status
when Status::STOPPED
sleep(1)
when Status::PUBLISHING
begin
message = Outboxer::Message.publish { |message| block.call(message) }
if message.nil?
sleep(1)
end
rescue StandardError
sleep(1)
end
end
end
end
end
begin
while (signal = self_read.gets&.strip)
case signal
when "TSTP"
status = Status::STOPPED
when "CONT"
status = Status::PUBLISHING
when "TERM", "INT"
status = Status::TERMINATING
break
end
end
rescue IOError, EOFError
end
threads.each(&:join)
status = Status::TERMINATING
logger.info("Publisher stopped cleanly (status=#{status})")
end
end
end