Skip to content

GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

License

Notifications You must be signed in to change notification settings

bxcodec/goqueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

28 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

๐Ÿš€ GoQueue - Universal Go Message Queue Library

Go Reference Go Report Card License: MIT GitHub stars

One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.

โœจ Why GoQueue?

Core Concept

๐ŸŽฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โšก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐Ÿ›ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐Ÿ”ง Extensible - Plugin architecture for custom middleware and queue providers
๐Ÿ“Š Observable - Built-in logging and middleware support for monitoring
๐Ÿš€ Developer Experience - Intuitive API design with sensible defaults


๐Ÿ“‹ Table of Contents


๐Ÿš€ Quick Start

Get up and running in less than 5 minutes:

go get -u github.com/bxcodec/goqueue
package main

import (
	"context"
    "log"

	"github.com/bxcodec/goqueue"
	"github.com/bxcodec/goqueue/consumer"
    "github.com/bxcodec/goqueue/publisher"
	"github.com/bxcodec/goqueue/interfaces"
)

func main() {

	// Create queue service
    queueSvc := goqueue.NewQueueService(
        options.WithConsumer(myConsumer),
        options.WithPublisher(myPublisher),
        options.WithMessageHandler(handleMessage),
    )

    // Publish a message
    queueSvc.Publish(context.Background(), interfaces.Message{
        Data:   map[string]interface{}{"hello": "world"},
        Action: "user.created",
        Topic:  "users",
    })

    // Start consuming
    queueSvc.Start(context.Background())
}

func handleMessage(ctx context.Context, m interfaces.InboundMessage) error {
    log.Printf("Received: %v", m.Data)
    return m.Ack(ctx) // Acknowledge successful processing
}

๐Ÿ’ซ Features

๐ŸŽฏ Core Features

  • Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
  • Unified API: Consistent interface across all queue providers
  • Type Safety: Strongly typed message structures
  • Context Support: Full Go context integration for cancellation and timeouts

๐Ÿ›ก๏ธ Reliability & Resilience

  • Automatic Retries: Configurable retry mechanisms with exponential backoff
  • Dead Letter Queues: Handle failed messages gracefully
  • Circuit Breaker: Built-in protection against cascading failures
  • Graceful Shutdown: Clean resource cleanup on application termination

๐Ÿ”ง Advanced Capabilities

  • Middleware System: Extensible pipeline for message processing
  • Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
  • Message Routing: Flexible topic and routing key patterns
  • Batching: Efficient batch message processing
  • Connection Pooling: Optimized connection management

๐Ÿ“Š Observability

  • Structured Logging: Built-in zerolog integration
  • Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
  • Tracing Support: OpenTelemetry compatible
  • Health Checks: Built-in health check endpoints

๐Ÿ› ๏ธ Installation

# Install the core library
go get -u github.com/bxcodec/goqueue

Requirements

  • Go 1.21 or higher
  • Message broker (RabbitMQ supported, more coming soon)

๐Ÿ“– Basic Usage

๐Ÿš€ Publisher Example

package main

import (
    "context"
    "github.com/bxcodec/goqueue/publisher"
    publisherOpts "github.com/bxcodec/goqueue/options/publisher"
    amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    // Connect to RabbitMQ
    conn, _ := amqp.Dial("amqp://localhost:5672/")

    // Create publisher
    pub := publisher.NewPublisher(
		publisherOpts.PublisherPlatformRabbitMQ,
		publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
            Conn:                     conn,
			PublisherChannelPoolSize: 5,
		}),
        publisherOpts.WithPublisherID("my-service"),
    )

    // Publish message
    err := pub.Publish(context.Background(), interfaces.Message{
        Data:   map[string]interface{}{"user_id": 123, "action": "signup"},
        Action: "user.created",
        Topic:  "users",
    })
	if err != nil {
        log.Fatal(err)
    }
}

๐Ÿ“จ Consumer Example

package main

import (
    "context"
    "github.com/bxcodec/goqueue/consumer"
    consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

func main() {
    // Create consumer
    cons := consumer.NewConsumer(
		consumerOpts.ConsumerPlatformRabbitMQ,
        consumerOpts.WithQueueName("user-events"),
		consumerOpts.WithMaxRetryFailedMessage(3),
        consumerOpts.WithBatchMessageSize(10),
    )

    // Start consuming
    cons.Consume(context.Background(), messageHandler, metadata)
}

func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error {
    // Process your message
    userData := msg.Data.(map[string]interface{})

    // Business logic here
    if err := processUser(userData); err != nil {
        // Retry with exponential backoff
        return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)
    }

    // Acknowledge successful processing
    return msg.Ack(ctx)
}

๐Ÿ”ง Advanced Features

๐Ÿ”„ Retry Mechanisms

GoQueue provides sophisticated retry mechanisms with multiple strategies:

// Exponential backoff retry
return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn)

// Custom retry logic
return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 {
    return retryCount * 2 // Custom delay calculation
})

// Move to dead letter queue after max retries
return msg.MoveToDeadLetterQueue(ctx)

๐Ÿ”Œ Middleware System

Extend functionality with custom middleware:

// Custom logging middleware
func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc {
    return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
        return func(ctx context.Context, m interfaces.InboundMessage) error {
            start := time.Now()
            err := next(ctx, m)
            log.Printf("Message processed in %v", time.Since(start))
            return err
        }
    }
}

// Apply middleware
cons := consumer.NewConsumer(
    consumerOpts.ConsumerPlatformRabbitMQ,
    consumerOpts.WithMiddlewares(
        LoggingMiddleware(),
        MetricsMiddleware(),
        AuthMiddleware(),
    ),
)

๐ŸŽ›๏ธ Configuration Options

Fine-tune your queue behavior:

cons := consumer.NewConsumer(
    consumerOpts.ConsumerPlatformRabbitMQ,
    consumerOpts.WithQueueName("high-priority-queue"),
    consumerOpts.WithMaxRetryFailedMessage(5),
    consumerOpts.WithBatchMessageSize(50),
    consumerOpts.WithConsumerID("worker-01"),
    consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
        ConsumerChannel: channel,
        ReQueueChannel:  requeueChannel,
        QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{
            Durable:    true,
            AutoDelete: false,
            Exclusive:  false,
        },
    }),
)

๐ŸŽฎ Examples

๐Ÿ“ Complete Examples

Explore our comprehensive examples:

๐Ÿฐ RabbitMQ Quick Setup

Start RabbitMQ with Docker:

# Clone the repository
git clone https://github.com/bxcodec/goqueue.git
cd goqueue/examples/rabbitmq/basic

# Start RabbitMQ
docker-compose up -d

# Run the example
go run main.go

๐Ÿ”„ Retry Architecture

GoQueue Retry Architecture

Automatic retry mechanism with exponential backoff and dead letter queue


๐Ÿ—๏ธ Architecture

๐ŸŽฏ Design Principles

  • Interface Segregation: Clean, focused interfaces for different responsibilities
  • Dependency Injection: Easy testing and swappable implementations
  • Error Handling: Comprehensive error types and recovery mechanisms
  • Performance: Optimized for high-throughput scenarios
  • Extensibility: Plugin architecture for custom providers and middleware

๐Ÿงฉ Core Components

Core Concept

๐Ÿ“ฆ Provider Support

Provider Status Features
RabbitMQ ๐Ÿ”„ Beta Version Full feature support
Google Pub/Sub ๐Ÿ“‹ Planned Coming soon
AWS SQS + SNS ๐Ÿ“‹ Planned Coming soon

๐Ÿ”ง Configuration

๐Ÿ“ Logging Setup

GoQueue uses structured logging with zerolog:

import "github.com/bxcodec/goqueue"

// Setup basic logging (automatic when importing consumer/publisher)
// OR setup with custom configuration:
goqueue.SetupLoggingWithDefaults() // Pretty console output for development

๐Ÿงช Testing

Run the test suite:

# Unit tests
make test

# Integration tests with RabbitMQ
make integration-test

๐Ÿ“š Documentation

๐Ÿ“– Component Documentation

Explore our comprehensive guides for each system component:

Component Description Documentation
๐Ÿ”Œ Middleware Extend functionality with custom logic ๐Ÿ“– Middleware Guide
๐Ÿ“จ Consumer Reliable message consumption and processing ๐Ÿ“– Consumer Guide
๐Ÿ“ค Publisher High-performance message publishing ๐Ÿ“– Publisher Guide
๐Ÿ”„ RabbitMQ Retry Advanced retry mechanisms for RabbitMQ ๐Ÿ“– Retry Architecture

๐ŸŽฏ Quick Links


๐Ÿค Contributing

We welcome contributions! Here's how to get started:

๐Ÿš€ Quick Contribution Guide

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“‹ Development Setup

# Clone your fork
git clone https://github.com/yourusername/goqueue.git
cd goqueue

# Install dependencies
go mod download

# Run tests
make test

# Run linting
make lint

๐ŸŽฏ Contribution Areas

  • ๐Ÿ”Œ New Queue Providers (Google Pub/Sub, SQS+SNS)
  • ๐Ÿ› ๏ธ Middleware Components (Metrics, Tracing, Auth)
  • ๐Ÿ“š Documentation & Examples
  • ๐Ÿงช Testing & Benchmarks
  • ๐Ÿ› Bug Fixes & Improvements

๐Ÿ“ž Support & Community


๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿ™ Acknowledgments

  • Thanks to all contributors
  • Inspired by the Go community's best practices
  • Built with โค๏ธ for the Go ecosystem

About

GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

Topics

Resources

License

Stars

Watchers

Forks

Contributors 2

  •  
  •