-
Notifications
You must be signed in to change notification settings - Fork 1
How outboxer works
Adam Mikulasev edited this page Apr 18, 2025
·
8 revisions
sequenceDiagram
participant A as Application Service
participant T as ActiveRecord::Base.transaction
participant E as Event
participant Q as Outboxer::Message
participant M as Outboxer::Models::Message
A->>T: begin
activate T
T->>E: create!
note right of E: in after_create callback
E->>Q: queue(messageable: event)
activate Q
Q->>M: create!(messageable: event, status: "queued")
deactivate Q
T->>T: commit
deactivate T
sequenceDiagram
participant CLI as bin/outboxer_publisher
participant P as Outboxer::Publisher
participant M as Outboxer::Models::Message
participant J as OutboxerIntegration::PublishMessageJob
participant R as Redis (Sidekiq)
CLI->>P: publish_message(&block)
activate P
P->>M: dequeue(status: "queued")
loop for each message
P->>M: update status: "publishing"
P->>J: perform_async(message_id, messageable_id, messageable_type)
J->>R: enqueue Sidekiq job
R-->>J: acknowledged
P->>M: update status: "published"
end
P-->>CLI: return
deactivate P
sequenceDiagram
participant P as Outboxer::Publisher
participant M as Outboxer::Models::Message
participant J as OutboxerIntegration::PublishMessageJob
participant R as Redis (Sidekiq)
P->>M: dequeue(status: "queued")
activate P
loop for each message
P->>M: update status: "publishing"
P->>J: perform_async(message_id, messageable_id, messageable_type)
J->>R: enqueue Sidekiq job
R-->>J: acknowledged
P->>M: update status: "published"
end
deactivate P
sequenceDiagram
participant S as Accountify::InvoiceService.draft
participant I as Accountify::Invoice
participant L as Accountify::InvoiceLineItem
participant C as Accountify::Contact
participant E as Accountify::InvoiceDraftedEvent
note right of E: subclass of Event
participant Q as Outboxer::Message
participant M as Outboxer::Models::Message
S->>S: Begin Transaction
activate S
S->>I: create!
activate I
I-->>S: invoice created
deactivate I
S->>L: create! (multiple line items)
activate L
L-->>S: line items created
deactivate L
S->>C: associate! (belongs_to)
activate C
C-->>S: contact associated
deactivate C
S->>E: create! (event)
activate E
E-->>S: event created
note right of E: triggers Event's after_create callback
E->>Q: Outboxer::Message.queue(messageable: event)
activate Q
Q->>M: create! (message with messageable = event)
activate M
M-->>Q: message persisted
deactivate M
Q-->>E: return
deactivate Q
deactivate E
S->>S: Commit Transaction
deactivate S
module Accountify
module InvoiceService
extend self
def draft(user_id:, tenant_id:,
organisation_id:, contact_id:,
currency_code:, due_date:, line_items:,
time: ::Time)
invoice = nil
event = nil
current_utc_time = time.now.utc
ActiveRecord::Base.transaction do
organisation = Organisation
.where(tenant_id: tenant_id)
.find_by!(id: organisation_id)
contact = Contact
.where(tenant_id: tenant_id)
.find_by!(organisation_id: organisation.id, id: contact_id)
invoice = Invoice.create!(
tenant_id: tenant_id,
organisation_id: organisation_id,
contact_id: contact_id,
status: Invoice::Status::DRAFTED,
currency_code: currency_code,
due_date: due_date,
sub_total_amount: line_items.sum do |line_item|
BigDecimal(line_item[:unit_amount][:amount]) * line_item[:quantity].to_i
end,
sub_total_currency_code: currency_code,
created_at: current_utc_time,
updated_at: current_utc_time)
invoice_line_items = line_items.map do |line_item|
invoice.line_items.create!(
description: line_item[:description],
unit_amount_amount: BigDecimal(line_item[:unit_amount][:amount]),
unit_amount_currency_code: line_item[:unit_amount][:currency_code],
quantity: line_item[:quantity])
end
event = InvoiceDraftedEvent.create!(
user_id: user_id,
tenant_id: tenant_id,
created_at: current_utc_time,
eventable: invoice,
body: {
'invoice' => {
'id' => invoice.id,
'organisation_id' => organisation.id,
'contact_id' => contact.id,
'status' => invoice.status,
'currency_code' => invoice.currency_code,
'due_date' => invoice.due_date,
'line_items' => invoice_line_items.map do |invoice_line_item|
{
'description' => invoice_line_item.description,
'unit_amount_amount' => invoice_line_item.unit_amount_amount.to_s,
'unit_amount_currency_code' => invoice_line_item.unit_amount_currency_code,
'quantity' => invoice_line_item.quantity
}
end,
'sub_total' => {
'amount' => invoice.sub_total_amount.to_s,
'currency_code' => invoice.sub_total_currency_code } } })
end
{ id: invoice.id, events: [{ id: event.id, type: event.type }] }
endsequenceDiagram
participant CLI as bin/outboxer_publisher
participant P as Outboxer::Publisher
participant DB as Outboxer::Database
participant Q as Queue
participant M as Outboxer::Models::Message
participant W as OutboxerIntegration::PublishMessageJob (Sidekiq)
CLI->>P: parse_cli_options
CLI->>DB: connect(config)
CLI->>P: publish_message(&block)
activate P
P->>P: create (publisher record)
P->>Q: initialize in-memory queue
loop while publishing
P->>M: buffer(limit: N)
M-->>P: messages (status=queued)
P->>Q: push(message)
loop for each message
Q->>P: pop()
activate P
P->>M: publishing(id)
P->>W: perform_async(message_id, messageable_id, messageable_type)
W-->>P: enqueued
P->>M: published(id)
deactivate P
end
end
P->>P: terminate (status=terminating)
P->>DB: disconnect
deactivate P