Skip to content

How outboxer works

Adam Mikulasev edited this page Apr 18, 2025 · 8 revisions

Queueing messages

sequenceDiagram
    participant A as Application Service
    participant T as ActiveRecord::Base.transaction
    participant E as Event
    participant Q as Outboxer::Message
    participant M as Outboxer::Models::Message

    A->>T: begin
    activate T

    T->>E: create!
    note right of E: in after_create callback
    E->>Q: queue(messageable: event)
    activate Q
    Q->>M: create!(messageable: event, status: "queued")
    deactivate Q

    T->>T: commit
    deactivate T

Loading

Publishing messages

sequenceDiagram
    participant CLI as bin/outboxer_publisher
    participant P as Outboxer::Publisher
    participant M as Outboxer::Models::Message
    participant J as OutboxerIntegration::PublishMessageJob
    participant R as Redis (Sidekiq)

    CLI->>P: publish_message(&block)
    activate P

    P->>M: dequeue(status: "queued")

    loop for each message
        P->>M: update status: "publishing"
        P->>J: perform_async(message_id, messageable_id, messageable_type)
        J->>R: enqueue Sidekiq job
        R-->>J: acknowledged
        P->>M: update status: "published"
    end

    P-->>CLI: return
    deactivate P

Loading
sequenceDiagram
    participant P as Outboxer::Publisher
    participant M as Outboxer::Models::Message
    participant J as OutboxerIntegration::PublishMessageJob
    participant R as Redis (Sidekiq)

    P->>M: dequeue(status: "queued")
    activate P

    loop for each message
        P->>M: update status: "publishing"
        P->>J: perform_async(message_id, messageable_id, messageable_type)
        J->>R: enqueue Sidekiq job
        R-->>J: acknowledged
        P->>M: update status: "published"
    end

    deactivate P
Loading

Queue side

Sequence diagram

sequenceDiagram
    participant S as Accountify::InvoiceService.draft
    participant I as Accountify::Invoice
    participant L as Accountify::InvoiceLineItem
    participant C as Accountify::Contact
    participant E as Accountify::InvoiceDraftedEvent
    note right of E: subclass of Event
    participant Q as Outboxer::Message
    participant M as Outboxer::Models::Message

    S->>S: Begin Transaction
    activate S

    S->>I: create!
    activate I
    I-->>S: invoice created
    deactivate I

    S->>L: create! (multiple line items)
    activate L
    L-->>S: line items created
    deactivate L

    S->>C: associate! (belongs_to)
    activate C
    C-->>S: contact associated
    deactivate C

    S->>E: create! (event)
    activate E
    E-->>S: event created
    note right of E: triggers Event's after_create callback
    E->>Q: Outboxer::Message.queue(messageable: event)
    activate Q

    Q->>M: create! (message with messageable = event)
    activate M
    M-->>Q: message persisted
    deactivate M

    Q-->>E: return
    deactivate Q
    deactivate E

    S->>S: Commit Transaction
    deactivate S
Loading

Implementation

module Accountify
  module InvoiceService
    extend self

    def draft(user_id:, tenant_id:,
              organisation_id:, contact_id:,
              currency_code:, due_date:, line_items:,
              time: ::Time)
      invoice = nil
      event = nil

      current_utc_time = time.now.utc

      ActiveRecord::Base.transaction do
        organisation = Organisation
          .where(tenant_id: tenant_id)
          .find_by!(id: organisation_id)

        contact = Contact
          .where(tenant_id: tenant_id)
          .find_by!(organisation_id: organisation.id, id: contact_id)

        invoice = Invoice.create!(
          tenant_id: tenant_id,
          organisation_id: organisation_id,
          contact_id: contact_id,
          status: Invoice::Status::DRAFTED,
          currency_code: currency_code,
          due_date: due_date,
          sub_total_amount: line_items.sum do |line_item|
            BigDecimal(line_item[:unit_amount][:amount]) * line_item[:quantity].to_i
          end,
          sub_total_currency_code: currency_code,
          created_at: current_utc_time,
          updated_at: current_utc_time)

        invoice_line_items = line_items.map do |line_item|
          invoice.line_items.create!(
            description: line_item[:description],
            unit_amount_amount: BigDecimal(line_item[:unit_amount][:amount]),
            unit_amount_currency_code: line_item[:unit_amount][:currency_code],
            quantity: line_item[:quantity])
        end

        event = InvoiceDraftedEvent.create!(
          user_id: user_id,
          tenant_id: tenant_id,
          created_at: current_utc_time,
          eventable: invoice,
          body: {
            'invoice' => {
              'id' => invoice.id,
              'organisation_id' => organisation.id,
              'contact_id' => contact.id,
              'status' => invoice.status,
              'currency_code' => invoice.currency_code,
              'due_date' => invoice.due_date,
              'line_items' => invoice_line_items.map do |invoice_line_item|
                {
                  'description' => invoice_line_item.description,
                  'unit_amount_amount' => invoice_line_item.unit_amount_amount.to_s,
                  'unit_amount_currency_code' => invoice_line_item.unit_amount_currency_code,
                  'quantity' => invoice_line_item.quantity
                }
              end,
              'sub_total' => {
                'amount' => invoice.sub_total_amount.to_s,
                'currency_code' => invoice.sub_total_currency_code } } })
      end

      { id: invoice.id, events: [{ id: event.id, type: event.type }] }
    end

Publisher side

sequenceDiagram
    participant CLI as bin/outboxer_publisher
    participant P as Outboxer::Publisher
    participant DB as Outboxer::Database
    participant Q as Queue
    participant M as Outboxer::Models::Message
    participant W as OutboxerIntegration::PublishMessageJob (Sidekiq)

    CLI->>P: parse_cli_options
    CLI->>DB: connect(config)
    CLI->>P: publish_message(&block)
    activate P

    P->>P: create (publisher record)
    P->>Q: initialize in-memory queue

    loop while publishing
        P->>M: buffer(limit: N)
        M-->>P: messages (status=queued)

        P->>Q: push(message)

        loop for each message
            Q->>P: pop()
            activate P
            P->>M: publishing(id)
            P->>W: perform_async(message_id, messageable_id, messageable_type)
            W-->>P: enqueued
            P->>M: published(id)
            deactivate P
        end
    end

    P->>P: terminate (status=terminating)
    P->>DB: disconnect
    deactivate P
Loading
Clone this wiki locally