Skip to content

Outboxer publisher block examples

Adam Mikulasev edited this page Jun 21, 2025 · 10 revisions

This page shows how to implement different message publishing strategies inside your bin/outboxer_publisher script using Sidekiq, RabbitMQ (Bunny), and Kafka (ruby-kafka).

The block passed to Outboxer::Publisher.publish_messages is responsible for publishing each message and marking it as either published or failed.

Sidekiq Integration

Single message

require 'sidekiq'

Outboxer::Publisher.publish_messages do |publisher, messages|
  messages.each do |message|
    Sidekiq::Client.push(
      'class' => "#{message[:messageable_type]}CreatedJob",
      'args'  => [message[:id]]
    )

    Outboxer::Message.published_by_ids(
      ids: [message[:id]],
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  rescue => exception
    Outboxer::Message.failed_by_ids(
      ids: [message[:id]],
      exception: exception,
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  end
end

Bulk messages

require 'sidekiq'

Outboxer::Publisher.publish_messages do |publisher, messages|
  begin
    grouped_jobs = messages.group_by do |message|
      "#{message[:messageable_type]}CreatedJob"
    end

    grouped_jobs.each do |job_class_name, grouped_messages|
      Sidekiq::Client.push_bulk(
        'class' => job_class_name,
        'args'  => grouped_messages.map { |message| [message[:id]] }
      )
    end

    Outboxer::Message.published_by_ids(
      ids: messages.map { |message| message[:id] },
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  rescue => exception
    Outboxer::Message.failed_by_ids(
      ids: messages.map { |message| message[:id] },
      exception: exception,
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  end
end

RabbitMQ (Bunny) Integration

require "bunny"

RabbitmqClient = Bunny.new.start
channel = RabbitmqClient.create_channel
exchange = channel.default_exchange

Outboxer::Publisher.publish_messages do |publisher, messages|
  messages.each do |message|
    begin
      exchange.publish(
        message.to_json,
        routing_key: message[:messageable_type].underscore,
        persistent: true
      )

      Outboxer::Message.published_by_ids(
        ids: [message[:id]],
        publisher_id: publisher[:id],
        publisher_name: publisher[:name]
      )
    rescue => exception
      Outboxer::Message.failed_by_ids(
        ids: [message[:id]],
        exception: exception,
        publisher_id: publisher[:id],
        publisher_name: publisher[:name]
      )
    end
  end
end

Kafka Integration (ruby-kafka)

require "kafka"

KafkaClient = Kafka.new(["localhost:9092"])
producer = KafkaClient.producer

topic = "messages"

Outboxer::Publisher.publish_messages do |publisher, messages|
  begin
    messages.each do |message|
      producer.produce(
        message.to_json,
        topic: topic,
        key: "#{message[:messageable_type]}:#{message[:messageable_id]}"
      )
    end

    producer.deliver_messages

    Outboxer::Message.published_by_ids(
      ids: messages.map { |message| message[:id] },
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  rescue => exception
    Outboxer::Message.failed_by_ids(
      ids: messages.map { |message| message[:id] },
      exception: exception,
      publisher_id: publisher[:id],
      publisher_name: publisher[:name]
    )
  end
end

AWS SQS (aws-sdk-sqs)

require "aws-sdk-sqs"

sqs_client = Aws::SQS::Client.new
queue_url = ENV.fetch("AWS_SQS_QUEUE_URL")

Outboxer::Publisher.publish_messages(
  concurrency: 4,
  batch_size: 10,
  logger: logger
) do |publisher, messages|
  messages.each_slice(10) do |message_batch|
    entries = message_batch.map do |message|
      {
        id: message[:id].to_s,
        message_body: message.to_json,
      }
    end

    begin
      response = sqs_client.send_message_batch(queue_url: queue_url, entries: entries)

      successful_ids = response.successful.map(&:id).map(&:to_i)
      failed_ids     = response.failed.map(&:id).map(&:to_i)

      Outboxer::Message.published_by_ids(
        ids: successful_ids,
        publisher_id: publisher[:id],
        publisher_name: publisher[:name]
      ) if !successful_ids.empty?

      Outboxer::Message.failed_by_ids(
        ids: failed_ids,
        exception: StandardError.new("Some messages failed in batch send"),
        publisher_id: publisher[:id],
        publisher_name: publisher[:name]
      ) if !failed_ids.empty?
    rescue => exception
      logger.error "[outboxer] Batch failed: #{exception.class} #{exception.message}"

      Outboxer::Message.failed_by_ids(
        ids: message_batch.map { |message| message[:id] },
        exception: exception,
        publisher_id: publisher[:id],
        publisher_name: publisher[:name]
      )
    end
  end
end

End to end testing

require "rails_helper"

RSpec.describe "Outboxer publisher integration", type: :integration do
  it "publishes a batch of messages" do
    messages = 10.times.map do |i|
      Outboxer::Message.queue(
        messageable_type: "Event",
        messageable_id: i.to_s
      )
    end

    pid = spawn("bin/outboxer_publisher")
    sleep 5
    Process.kill("TERM", pid)
    Process.wait(pid)

    published_ids = Outboxer::Message
      .list(status: :published)[:messages]
      .map { |message| message[:id] }

    expect(published_ids).to include(*messages.map(&:id))
  end
end

Notes

  • The block passed to publish_messages must be thread-safe.
  • Always call published_by_ids or failed_by_ids at least once for every message.

Clone this wiki locally