From 578cfd795aa61035a316b91994b99df7302b8f9d Mon Sep 17 00:00:00 2001 From: Joe Cai Date: Wed, 30 Jul 2025 09:23:11 +1000 Subject: [PATCH] Add new method Publisher.NotifyPublishWithReturn for ordered return/confirmation handling - Add a new method Publisher.NotifyPublishWithReturn that ensures proper pairing of returns and confirmations according to RabbitMQ protocol where returns are immediately followed by confirmations for unroutable messages - Add comprehensive integration tests covering routable/unroutable scenarios with mandatory and non-mandatory publishing This addresses the issue where NotifyPublish() and NotifyReturn() handlers run in separate goroutines and can go out of order, making it difficult to properly link returned messages with their confirmations for reliable unrouted message handling. --- .github/workflows/tests.yml | 1 - Makefile | 6 + examples/publisher_confirm/main.go | 6 +- integration_test.go | 194 +++++++++++++++++++++++++++-- publish.go | 111 ++++++++++++++++- 5 files changed, 304 insertions(+), 14 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8e47620..56f6c9e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,6 @@ jobs: runs-on: ubuntu-latest env: GOPROXY: "https://proxy.golang.org,direct" - ENABLE_DOCKER_INTEGRATION_TESTS: TRUE steps: - name: Set up Go diff --git a/Makefile b/Makefile index 94b7cce..cf9522d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,12 @@ all: test vet staticcheck + +# Run all tests, including integration tests test: + go test -v -tags=integration ./... + +# Run unit tests only +unit-test: go test -v ./... vet: diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go index f5aecaf..92895da 100644 --- a/examples/publisher_confirm/main.go +++ b/examples/publisher_confirm/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { @@ -38,6 +38,10 @@ func main() { log.Printf("message returned from server: %s", string(r.Body)) }) + publisher.NotifyPublishWithReturn(func(p rabbitmq.Confirmation, r rabbitmq.Return) { + log.Printf("message confirmed and returned from server: %s, deliveryTag=%d, ACK=%v", string(r.Body), p.DeliveryTag, p.Ack) + }) + // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) diff --git a/integration_test.go b/integration_test.go index 5df151e..0604cb1 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1,22 +1,17 @@ +//go:build integration + package rabbitmq import ( "context" "fmt" - "os" "os/exec" "strings" "testing" "time" ) -const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS` - func prepareDockerTest(t *testing.T) (connStr string) { - if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" { - t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag) - return - } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -97,7 +92,7 @@ func TestSimplePubSub(t *testing.T) { } defer consumer.CloseWithContext(context.Background()) - // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full + // Set up a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full // it does not block. consumed := make(chan Delivery) defer close(consumed) @@ -154,3 +149,186 @@ func TestSimplePubSub(t *testing.T) { } } } + +// TestNotifyPublishWithReturn tests the NotifyPublishWithReturn functionality by publishing both +// routable and unroutable messages and verifying the handler receives the correct confirmations and returns. +func TestNotifyPublishWithReturn(t *testing.T) { + connStr := prepareDockerTest(t) + conn := waitForHealthyAmqp(t, connStr) + defer conn.Close() + + // Define a struct to hold the results of the publish operation + type publishResult struct { + confirmation Confirmation + returnInfo Return + hasReturn bool + } + + // Helper function to set up publisher and result channel + setupPublisher := func(t *testing.T) (*Publisher, chan publishResult) { + publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating publisher", err) + } + results := make(chan publishResult, 10) + + // Set up the handler + publisher.NotifyPublishWithReturn(func(p Confirmation, r Return) { + hasReturn := r.ReplyCode != 0 + t.Logf("NotifyPublishWithReturn called: ack=%v, hasReturn=%v, replyCode=%d", + p.Ack, hasReturn, r.ReplyCode) + + results <- publishResult{ + confirmation: p, + returnInfo: r, + hasReturn: hasReturn, + } + }) + + return publisher, results + } + + t.Run("UnroutableMandatoryMessage_ShouldBeAckedAndReturned", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Publish to non-existent queue with mandatory=true (should return) + t.Logf("publishing to non-existent queue (should be returned)") + err := publisher.PublishWithContext( + ctx, + []byte("test message 1"), + []string{"non-existent-queue"}, + WithPublishOptionsMandatory, + ) + if err != nil { + t.Fatal("failed to publish to non-existent queue", err) + } + + // Wait for the return + confirmation + select { + case result := <-results: + if !result.hasReturn { + t.Fatal("expected a return for unroutable message, but got none") + } + if result.returnInfo.ReplyCode == 0 { + t.Fatal("expected non-zero reply code for returned message") + } + if !result.confirmation.Ack { + t.Fatal("expected message to be acked, but it was nacked") + } + t.Logf("correctly received return: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for return notification") + } + }) + + t.Run("RoutableMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Create a queue and publish to it (should succeed, no return) + consumerQueue := "test_queue_" + fmt.Sprintf("%d", time.Now().UnixNano()) + consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating consumer", err) + } + defer consumer.CloseWithContext(context.Background()) + + // Start consumer to ensure queue exists + consumed := make(chan Delivery, 1) + go func() { + err = consumer.Run(func(d Delivery) Action { + select { + case consumed <- d: + default: + } + return Ack + }) + if err != nil { + t.Log("consumer run failed", err) + } + }() + + // Wait for the consumer to be ready + time.Sleep(100 * time.Millisecond) + + t.Logf("publishing to existing queue (should succeed)") + err = publisher.PublishWithContext( + ctx, + []byte("test message 2"), + []string{consumerQueue}, + WithPublishOptionsMandatory, + ) + if err != nil { + t.Fatal("failed to publish to existing queue", err) + } + + // Wait for the confirmation (no return expected) + select { + case result := <-results: + if result.hasReturn { + t.Fatalf("unexpected return for routable message: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + } + if !result.confirmation.Ack { + t.Fatal("expected message to be acked, but it was nacked") + } + t.Logf("correctly received confirmation without return: ack=%v", result.confirmation.Ack) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for confirmation notification") + } + + // Verify the message was actually consumed + select { + case d := <-consumed: + t.Logf("message successfully consumed: '%s'", string(d.Body)) + if string(d.Body) != "test message 2" { + t.Fatalf("expected message 'test message 2', got '%s'", string(d.Body)) + } + case <-time.After(time.Second * 3): + t.Fatal("timeout waiting for message consumption") + } + }) + + t.Run("UnroutableNonMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Publish without mandatory flag to non-existent queue (should succeed, no return) + t.Logf("publishing to non-existent queue without mandatory (should succeed)") + err := publisher.PublishWithContext( + ctx, + []byte("test message 3"), + []string{"another-non-existent-queue"}, + // No WithPublishOptionsMandatory - defaults to false + ) + if err != nil { + t.Fatal("failed to publish without mandatory", err) + } + + // Wait for the confirmation (no return expected) + select { + case result := <-results: + if result.hasReturn { + t.Fatalf("unexpected return for non-mandatory message: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + } + if !result.confirmation.Ack { + t.Fatal("expected non-mandatory message to be acked, but it was nacked") + } + t.Logf("correctly received confirmation without return for non-mandatory: ack=%v", result.confirmation.Ack) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for non-mandatory confirmation notification") + } + }) +} diff --git a/publish.go b/publish.go index 06f9cb0..11994c6 100644 --- a/publish.go +++ b/publish.go @@ -53,9 +53,10 @@ type Publisher struct { disablePublishDueToBlocked bool disablePublishDueToBlockedMu *sync.RWMutex - handlerMu *sync.Mutex - notifyReturnHandler func(r Return) - notifyPublishHandler func(p Confirmation) + handlerMu *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) + notifyPublishWithReturnHandler func(p Confirmation, r Return) options PublisherOptions } @@ -121,6 +122,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe } publisher.startReturnHandler() publisher.startPublishHandler() + publisher.startPublishWithReturnHandler() } }() @@ -320,6 +322,40 @@ func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { } } +// NotifyPublishWithReturn registers a listener for publish confirmation events with optional return information, +// delivered in the correct order according to RabbitMQ protocol. Must set ConfirmPublishings option. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind. +// +// This method is particularly useful when publishing with mandatory or immediate flags, as it allows you to +// handle both the return (if unroutable) and the associated confirmation in a single handler with proper ordering. +// +// Handler receives: +// - Confirmation: Always present for every published message (ack/nack) +// - Return: Only present if the message was unroutable with mandatory/immediate flags (empty Return{} otherwise) +// +// For successfully routed messages: +// - Only confirmation is received, Return will be empty +// +// For unrouted mandatory or immediate messages: +// - Return is received first (as per RabbitMQ protocol), followed immediately by confirmation +// - Both are delivered to the handler together in the correct pairing +// +// See github.com/rabbitmq/amqp091-go documentation: +// https://pkg.go.dev/github.com/rabbitmq/amqp091-go#Channel.Confirm +// "Unroutable mandatory or immediate messages are acknowledged immediately after any Channel.NotifyReturn +// listeners have been notified." +func (publisher *Publisher) NotifyPublishWithReturn(handler func(p Confirmation, r Return)) { + publisher.handlerMu.Lock() + shouldStart := publisher.notifyPublishWithReturnHandler == nil + publisher.notifyPublishWithReturnHandler = handler + publisher.handlerMu.Unlock() + + if shouldStart { + publisher.startPublishWithReturnHandler() + } +} + func (publisher *Publisher) startReturnHandler() { publisher.handlerMu.Lock() if publisher.notifyReturnHandler == nil { @@ -343,7 +379,11 @@ func (publisher *Publisher) startPublishHandler() { return } publisher.handlerMu.Unlock() - publisher.chanManager.ConfirmSafe(false) + err := publisher.chanManager.ConfirmSafe(false) + if err != nil { + publisher.options.Logger.Errorf("failed to enable confirm mode for publish handler: %v", err) + return + } go func() { confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) @@ -355,3 +395,66 @@ func (publisher *Publisher) startPublishHandler() { } }() } + +func (publisher *Publisher) startPublishWithReturnHandler() { + publisher.handlerMu.Lock() + if publisher.notifyPublishWithReturnHandler == nil { + publisher.handlerMu.Unlock() + return + } + publisher.handlerMu.Unlock() + + // Enable confirm mode for this channel + err := publisher.chanManager.ConfirmSafe(false) + if err != nil { + publisher.options.Logger.Errorf("failed to enable confirm mode for return publish in order handler: %v", err) + return + } + + go func() { + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return)) + confirmations := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation)) + + for { + select { + case ret, ok := <-returns: + if !ok { + // returns channel closed, likely due to publisher being closed + publisher.options.Logger.Warnf("returns channel closed, stopping return handler") + returns = nil + continue + } + + // According to AMQP 0.9.1 protocol, when a message is returned, + // the return is immediately followed by a confirmation. + // We must consume the next confirmation to pair with this return. + select { + case conf, ok := <-confirmations: + if !ok { + publisher.options.Logger.Warnf("confirmations channel closed while waiting for confirmation after return") + return + } + + // Call handler with both return and confirmation + go publisher.notifyPublishWithReturnHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }, Return{ret}) + } + + case conf, ok := <-confirmations: + if !ok { + publisher.options.Logger.Warnf("confirmations channel closed") + // confirmations channel closed, likely due to publisher being closed + return + } + + // This is a confirmation without a return (successful delivery) + go publisher.notifyPublishWithReturnHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }, Return{}) + } + } + }() +}