-
Notifications
You must be signed in to change notification settings - Fork 1
Outboxer publisher block examples
Adam Mikulasev edited this page Jun 21, 2025
·
10 revisions
This page shows how to implement different message publishing strategies inside your bin/outboxer_publisher script using Sidekiq, RabbitMQ (Bunny), and Kafka (ruby-kafka).
The block passed to Outboxer::Publisher.publish_messages is responsible for publishing each message and marking it as either published or failed.
require 'sidekiq'
Outboxer::Publisher.publish_messages do |publisher, messages|
messages.each do |message|
Sidekiq::Client.push(
'class' => "#{message[:messageable_type]}CreatedJob",
'args' => [message[:id]]
)
Outboxer::Message.published_by_ids(
ids: [message[:id]],
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
rescue => exception
Outboxer::Message.failed_by_ids(
ids: [message[:id]],
exception: exception,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
end
endrequire 'sidekiq'
Outboxer::Publisher.publish_messages do |publisher, messages|
begin
grouped_jobs = messages.group_by do |message|
"#{message[:messageable_type]}CreatedJob"
end
grouped_jobs.each do |job_class_name, grouped_messages|
Sidekiq::Client.push_bulk(
'class' => job_class_name,
'args' => grouped_messages.map { |message| [message[:id]] }
)
end
Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
rescue => exception
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: exception,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
end
endrequire "bunny"
RabbitmqClient = Bunny.new.start
channel = RabbitmqClient.create_channel
exchange = channel.default_exchange
Outboxer::Publisher.publish_messages do |publisher, messages|
messages.each do |message|
begin
exchange.publish(
message.to_json,
routing_key: message[:messageable_type].underscore,
persistent: true
)
Outboxer::Message.published_by_ids(
ids: [message[:id]],
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
rescue => exception
Outboxer::Message.failed_by_ids(
ids: [message[:id]],
exception: exception,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
end
end
endrequire "kafka"
KafkaClient = Kafka.new(["localhost:9092"])
producer = KafkaClient.producer
topic = "messages"
Outboxer::Publisher.publish_messages do |publisher, messages|
begin
messages.each do |message|
producer.produce(
message.to_json,
topic: topic,
key: "#{message[:messageable_type]}:#{message[:messageable_id]}"
)
end
producer.deliver_messages
Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
rescue => exception
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: exception,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
end
endrequire "aws-sdk-sqs"
sqs_client = Aws::SQS::Client.new
queue_url = ENV.fetch("AWS_SQS_QUEUE_URL")
Outboxer::Publisher.publish_messages(
concurrency: 4,
batch_size: 10,
logger: logger
) do |publisher, messages|
messages.each_slice(10) do |message_batch|
entries = message_batch.map do |message|
{
id: message[:id].to_s,
message_body: message.to_json,
}
end
begin
response = sqs_client.send_message_batch(queue_url: queue_url, entries: entries)
successful_ids = response.successful.map(&:id).map(&:to_i)
failed_ids = response.failed.map(&:id).map(&:to_i)
Outboxer::Message.published_by_ids(
ids: successful_ids,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
) if !successful_ids.empty?
Outboxer::Message.failed_by_ids(
ids: failed_ids,
exception: StandardError.new("Some messages failed in batch send"),
publisher_id: publisher[:id],
publisher_name: publisher[:name]
) if !failed_ids.empty?
rescue => exception
logger.error "[outboxer] Batch failed: #{exception.class} #{exception.message}"
Outboxer::Message.failed_by_ids(
ids: message_batch.map { |message| message[:id] },
exception: exception,
publisher_id: publisher[:id],
publisher_name: publisher[:name]
)
end
end
endrequire "rails_helper"
RSpec.describe "Outboxer publisher integration", type: :integration do
it "publishes a batch of messages" do
messages = 10.times.map do |i|
Outboxer::Message.queue(
messageable_type: "Event",
messageable_id: i.to_s
)
end
pid = spawn("bin/outboxer_publisher")
sleep 5
Process.kill("TERM", pid)
Process.wait(pid)
published_ids = Outboxer::Message
.list(status: :published)[:messages]
.map { |message| message[:id] }
expect(published_ids).to include(*messages.map(&:id))
end
end- The block passed to publish_messages must be thread-safe.
- Always call
published_by_idsorfailed_by_idsat least once for every message.