One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.
๐ฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐ง Extensible - Plugin architecture for custom middleware and queue providers
๐ Observable - Built-in logging and middleware support for monitoring
๐ Developer Experience - Intuitive API design with sensible defaults
- ๐ Quick Start
- ๐ซ Features
- ๐ ๏ธ Installation
- ๐ Basic Usage
- ๐ง Advanced Features
- ๐ฎ Examples
- ๐๏ธ Architecture
- ๐ Documentation
- ๐ค Contributing
- ๐ License
Get up and running in less than 5 minutes:
go get -u github.com/bxcodec/goqueue
package main
import (
"context"
"log"
"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
"github.com/bxcodec/goqueue/publisher"
"github.com/bxcodec/goqueue/interfaces"
)
func main() {
// Create queue service
queueSvc := goqueue.NewQueueService(
options.WithConsumer(myConsumer),
options.WithPublisher(myPublisher),
options.WithMessageHandler(handleMessage),
)
// Publish a message
queueSvc.Publish(context.Background(), interfaces.Message{
Data: map[string]interface{}{"hello": "world"},
Action: "user.created",
Topic: "users",
})
// Start consuming
queueSvc.Start(context.Background())
}
func handleMessage(ctx context.Context, m interfaces.InboundMessage) error {
log.Printf("Received: %v", m.Data)
return m.Ack(ctx) // Acknowledge successful processing
}
- Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
- Unified API: Consistent interface across all queue providers
- Type Safety: Strongly typed message structures
- Context Support: Full Go context integration for cancellation and timeouts
- Automatic Retries: Configurable retry mechanisms with exponential backoff
- Dead Letter Queues: Handle failed messages gracefully
- Circuit Breaker: Built-in protection against cascading failures
- Graceful Shutdown: Clean resource cleanup on application termination
- Middleware System: Extensible pipeline for message processing
- Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
- Message Routing: Flexible topic and routing key patterns
- Batching: Efficient batch message processing
- Connection Pooling: Optimized connection management
- Structured Logging: Built-in zerolog integration
- Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
- Tracing Support: OpenTelemetry compatible
- Health Checks: Built-in health check endpoints
# Install the core library
go get -u github.com/bxcodec/goqueue
- Go 1.21 or higher
- Message broker (RabbitMQ supported, more coming soon)
package main
import (
"context"
"github.com/bxcodec/goqueue/publisher"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// Connect to RabbitMQ
conn, _ := amqp.Dial("amqp://localhost:5672/")
// Create publisher
pub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: conn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("my-service"),
)
// Publish message
err := pub.Publish(context.Background(), interfaces.Message{
Data: map[string]interface{}{"user_id": 123, "action": "signup"},
Action: "user.created",
Topic: "users",
})
if err != nil {
log.Fatal(err)
}
}
package main
import (
"context"
"github.com/bxcodec/goqueue/consumer"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)
func main() {
// Create consumer
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithQueueName("user-events"),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(10),
)
// Start consuming
cons.Consume(context.Background(), messageHandler, metadata)
}
func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error {
// Process your message
userData := msg.Data.(map[string]interface{})
// Business logic here
if err := processUser(userData); err != nil {
// Retry with exponential backoff
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
}
// Acknowledge successful processing
return msg.Ack(ctx)
}
GoQueue provides sophisticated retry mechanisms with multiple strategies:
// Exponential backoff retry
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
// Custom retry logic
return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 {
return retryCount * 2 // Custom delay calculation
})
// Move to dead letter queue after max retries
return msg.MoveToDeadLetterQueue(ctx)
Extend functionality with custom middleware:
// Custom logging middleware
func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) error {
start := time.Now()
err := next(ctx, m)
log.Printf("Message processed in %v", time.Since(start))
return err
}
}
}
// Apply middleware
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithMiddlewares(
LoggingMiddleware(),
MetricsMiddleware(),
AuthMiddleware(),
),
)
Fine-tune your queue behavior:
cons := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithQueueName("high-priority-queue"),
consumerOpts.WithMaxRetryFailedMessage(5),
consumerOpts.WithBatchMessageSize(50),
consumerOpts.WithConsumerID("worker-01"),
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: channel,
ReQueueChannel: requeueChannel,
QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{
Durable: true,
AutoDelete: false,
Exclusive: false,
},
}),
)
Explore our comprehensive examples:
- Basic Usage - Simple publish/consume example
- With Retries - Advanced retry mechanisms
Start RabbitMQ with Docker:
# Clone the repository
git clone https://github.com/bxcodec/goqueue.git
cd goqueue/examples/rabbitmq/basic
# Start RabbitMQ
docker-compose up -d
# Run the example
go run main.go
Automatic retry mechanism with exponential backoff and dead letter queue
- Interface Segregation: Clean, focused interfaces for different responsibilities
- Dependency Injection: Easy testing and swappable implementations
- Error Handling: Comprehensive error types and recovery mechanisms
- Performance: Optimized for high-throughput scenarios
- Extensibility: Plugin architecture for custom providers and middleware
Provider | Status | Features |
---|---|---|
RabbitMQ | ๐ Beta Version | Full feature support |
Google Pub/Sub | ๐ Planned | Coming soon |
AWS SQS + SNS | ๐ Planned | Coming soon |
GoQueue uses structured logging with zerolog:
import "github.com/bxcodec/goqueue"
// Setup basic logging (automatic when importing consumer/publisher)
// OR setup with custom configuration:
goqueue.SetupLoggingWithDefaults() // Pretty console output for development
Run the test suite:
# Unit tests
make test
# Integration tests with RabbitMQ
make integration-test
Explore our comprehensive guides for each system component:
Component | Description | Documentation |
---|---|---|
๐ Middleware | Extend functionality with custom logic | ๐ Middleware Guide |
๐จ Consumer | Reliable message consumption and processing | ๐ Consumer Guide |
๐ค Publisher | High-performance message publishing | ๐ Publisher Guide |
๐ RabbitMQ Retry | Advanced retry mechanisms for RabbitMQ | ๐ Retry Architecture |
- ๐ Full Documentation Index - Complete documentation overview
- ๐ง API Reference - Go package documentation
- ๐ฎ Examples - Working code examples
- ๐ Troubleshooting - Common issues and solutions
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
# Clone your fork
git clone https://github.com/yourusername/goqueue.git
cd goqueue
# Install dependencies
go mod download
# Run tests
make test
# Run linting
make lint
- ๐ New Queue Providers (Google Pub/Sub, SQS+SNS)
- ๐ ๏ธ Middleware Components (Metrics, Tracing, Auth)
- ๐ Documentation & Examples
- ๐งช Testing & Benchmarks
- ๐ Bug Fixes & Improvements
- ๐ Documentation: pkg.go.dev/github.com/bxcodec/goqueue
- ๐ Issues: GitHub Issues
- ๐ง Email: iman@tumorang.com
This project is licensed under the MIT License - see the LICENSE file for details.
- Thanks to all contributors
- Inspired by the Go community's best practices
- Built with โค๏ธ for the Go ecosystem