File tree Expand file tree Collapse file tree 1 file changed +15
-0
lines changed Expand file tree Collapse file tree 1 file changed +15
-0
lines changed Original file line number Diff line number Diff line change 11package rabbitmq
22
33import (
4+ "time"
5+
46 amqp "github.com/rabbitmq/amqp091-go"
57 "github.com/wagslane/go-rabbitmq/internal/logger"
68)
@@ -329,3 +331,16 @@ func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
329331
330332 options .QueueOptions .Args ["x-queue-type" ] = "quorum"
331333}
334+
335+ // WithConsumerOptionsQueueMessageExpiration sets the message expiration (TTL) for all messages in the queue.
336+ // This option defines how long a message can remain in the queue before it is discarded if not consumed.
337+ // The TTL is specified as a time.Duration and will be converted to milliseconds for RabbitMQ.
338+ // See https://www.rabbitmq.com/docs/ttl#per-queue-message-ttl
339+ func WithConsumerOptionsQueueMessageExpiration (ttl time.Duration ) func (* ConsumerOptions ) {
340+ return func (options * ConsumerOptions ) {
341+ if options .QueueOptions .Args == nil {
342+ options .QueueOptions .Args = Table {}
343+ }
344+ options .QueueOptions .Args ["x-message-ttl" ] = ttl .Milliseconds ()
345+ }
346+ }
You can’t perform that action at this time.
0 commit comments