Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async_handoff_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"testing"
"time"

"github.com/redis/go-redis/v9/maintnotifications"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/logging"
"github.com/redis/go-redis/v9/maintnotifications"
)

// mockNetConn implements net.Conn for testing
Expand Down
6 changes: 3 additions & 3 deletions internal/maintnotifications/logs/log_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,11 @@ func RemovingConnectionFromPool(connID uint64, reason error) string {
})
}

func NoPoolProvidedCannotRemove(connID uint64, reason error) string {
message := fmt.Sprintf("conn[%d] %s due to: %v", connID, NoPoolProvidedMessageCannotRemoveMessage, reason)
func NoPoolProvidedCannotRemove(connID uint64) string {
message := fmt.Sprintf("conn[%d] %s", connID, NoPoolProvidedMessageCannotRemoveMessage)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"reason": reason.Error(),
"reason": nil,
})
}

Expand Down
34 changes: 23 additions & 11 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/util"
"github.com/redis/go-redis/v9/logging"
)

var (
Expand Down Expand Up @@ -115,6 +116,9 @@ type Options struct {
// DialerRetryTimeout is the backoff duration between retry attempts.
// Default: 100ms
DialerRetryTimeout time.Duration

// Optional logger for connection pool operations.
Logger *logging.CustomLogger
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -223,7 +227,7 @@ func (p *ConnPool) checkMinIdleConns() {
p.idleConnsLen.Add(-1)

p.freeTurn()
internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err)
p.logger().Errorf(context.Background(), "addIdleConn panic: %+v", err)
}
}()

Expand Down Expand Up @@ -379,7 +383,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return cn, nil
}

internal.Logger.Printf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
p.logger().Errorf(ctx, "redis: connection pool: failed to dial after %d attempts: %v", maxRetries, lastErr)
// All retries failed - handle error tracking
p.setLastDialError(lastErr)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
Expand Down Expand Up @@ -452,7 +456,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {

for {
if attempts >= getAttempts {
internal.Logger.Printf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
p.logger().Errorf(ctx, "redis: connection pool: was not able to get a healthy connection after %d attempts", attempts)
break
}
attempts++
Expand All @@ -479,12 +483,12 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
if hookManager != nil {
acceptConn, err := hookManager.ProcessOnGet(ctx, cn, false)
if err != nil {
internal.Logger.Printf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
p.logger().Errorf(ctx, "redis: connection pool: failed to process idle connection by hook: %v", err)
_ = p.CloseConn(cn)
continue
}
if !acceptConn {
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
p.logger().Errorf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
p.Put(ctx, cn)
cn = nil
continue
Expand All @@ -509,7 +513,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
// this should not happen with a new connection, but we handle it gracefully
if err != nil || !acceptConn {
// Failed to process connection, discard it
internal.Logger.Printf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
p.logger().Errorf(ctx, "redis: connection pool: failed to process new connection conn[%d] by hook: accept=%v, err=%v", newcn.GetID(), acceptConn, err)
_ = p.CloseConn(newcn)
return nil, err
}
Expand Down Expand Up @@ -703,7 +707,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {

// If we exhausted all attempts without finding a usable connection, return nil
if attempts > 1 && attempts >= maxAttempts && int32(attempts) >= p.poolSize.Load() {
internal.Logger.Printf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
p.logger().Errorf(context.Background(), "redis: connection pool: failed to get a usable connection after %d attempts", attempts)
return nil, nil
}

Expand All @@ -720,7 +724,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
// Peek at the reply type to check if it's a push notification
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
// Not a push notification or error peeking, remove connection
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
p.logger().Errorf(ctx, "Conn has unread data (not push notification), removing it")
p.Remove(ctx, cn, err)
}
// It's a push notification, allow pooling (client will handle it)
Expand All @@ -733,7 +737,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if hookManager != nil {
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
if err != nil {
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
p.logger().Errorf(ctx, "Connection hook error: %v", err)
p.Remove(ctx, cn, err)
return
}
Expand Down Expand Up @@ -835,7 +839,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
// this can be idle conn
for idx, ic := range p.idleConns {
if ic.GetID() == cid {
internal.Logger.Printf(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
p.logger().Infof(context.Background(), "redis: connection pool: removing idle conn[%d]", cid)
p.idleConns = append(p.idleConns[:idx], p.idleConns[idx+1:]...)
p.idleConnsLen.Add(-1)
break
Expand Down Expand Up @@ -951,7 +955,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For RESP3 connections with push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
p.logger().Infof(context.Background(), "push: conn[%d] has buffered data, likely push notifications - will be processed by client", cn.GetID())
return true // Connection is healthy, client will handle notifications
}
return false // Unexpected data, not push notifications, connection is unhealthy
Expand All @@ -961,3 +965,11 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
}
return true
}

func (p *ConnPool) logger() *logging.CustomLogger {
var logger *logging.CustomLogger
if p.cfg != nil && p.cfg.Logger != nil {
logger = p.cfg.Logger
}
return logger
}
144 changes: 144 additions & 0 deletions logging/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package logging

import (
"context"
"fmt"
)

// CustomLogger is a logger interface with leveled logging methods.
//
// This interface can be implemented by custom loggers to provide leveled logging.
type CustomLogger struct {
logger LoggerWithLevel
loggerLevel *LogLevelT
printfAdapter PrintfAdapter
}

func NewCustomLogger(logger LoggerWithLevel, opts ...CustomLoggerOption) *CustomLogger {
cl := &CustomLogger{
logger: logger,
}
for _, opt := range opts {
opt(cl)
}
return cl
}

type CustomLoggerOption func(*CustomLogger)

func WithPrintfAdapter(adapter PrintfAdapter) CustomLoggerOption {
return func(cl *CustomLogger) {
cl.printfAdapter = adapter
}
}

func WithLoggerLevel(level LogLevelT) CustomLoggerOption {
return func(cl *CustomLogger) {
cl.loggerLevel = &level
}
}

// PrintfAdapter is a function that converts Printf-style log messages into structured log messages.
// It can be used to extract key-value pairs from the formatted message.
type PrintfAdapter func(ctx context.Context, format string, v ...any) (context.Context, string, []any)

// Error is a structured error level logging method with context and arguments.
func (cl *CustomLogger) Error(ctx context.Context, msg string, args ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Errorf(ctx, msg, args...)
return
}
cl.logger.ErrorContext(ctx, msg, args...)
}

func (cl *CustomLogger) Errorf(ctx context.Context, format string, v ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Errorf(ctx, format, v...)
return
}
cl.logger.ErrorContext(ctx, format, v...)
}

// Warn is a structured warning level logging method with context and arguments.
func (cl *CustomLogger) Warn(ctx context.Context, msg string, args ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Warnf(ctx, msg, args...)
return
}
cl.logger.WarnContext(ctx, msg, args...)
}

func (cl *CustomLogger) Warnf(ctx context.Context, format string, v ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Warnf(ctx, format, v...)
return
}
cl.logger.WarnContext(cl.printfToStructured(ctx, format, v...))
}

// Info is a structured info level logging method with context and arguments.
func (cl *CustomLogger) Info(ctx context.Context, msg string, args ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Infof(ctx, msg, args...)
return
}
cl.logger.InfoContext(ctx, msg, args...)
}

// Debug is a structured debug level logging method with context and arguments.
func (cl *CustomLogger) Debug(ctx context.Context, msg string, args ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Debugf(ctx, msg, args...)
return
}
cl.logger.DebugContext(ctx, msg, args...)
}

func (cl *CustomLogger) Infof(ctx context.Context, format string, v ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Infof(ctx, format, v...)
return
}

cl.logger.InfoContext(cl.printfToStructured(ctx, format, v...))
}

func (cl *CustomLogger) Debugf(ctx context.Context, format string, v ...any) {
if cl == nil || cl.logger == nil {
legacyLoggerWithLevel.Debugf(ctx, format, v...)
return
}
cl.logger.DebugContext(cl.printfToStructured(ctx, format, v...))
}

func (cl *CustomLogger) printfToStructured(ctx context.Context, format string, v ...any) (context.Context, string, []any) {
if cl.printfAdapter != nil {
return cl.printfAdapter(ctx, format, v...)
}
return ctx, fmt.Sprintf(format, v...), nil
}

func (cl *CustomLogger) Enabled(ctx context.Context, level LogLevelT) bool {
if cl.loggerLevel != nil {
return level >= *cl.loggerLevel
}

return legacyLoggerWithLevel.Enabled(ctx, level)
}

// LoggerWithLevel is a logger interface with leveled logging methods.
//
// [slog.Logger] from the standard library satisfies this interface.
type LoggerWithLevel interface {
// InfoContext logs an info level message
InfoContext(ctx context.Context, format string, v ...any)

// WarnContext logs a warning level message
WarnContext(ctx context.Context, format string, v ...any)

// Debugf logs a debug level message
DebugContext(ctx context.Context, format string, v ...any)

// Errorf logs an error level message
ErrorContext(ctx context.Context, format string, v ...any)
}
91 changes: 91 additions & 0 deletions logging/legacy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package logging

import (
"context"

"github.com/redis/go-redis/v9/internal"
)

// legacyLoggerAdapter is a logger that implements [LoggerWithLevel] interface
// using the global [internal.Logger] and [internal.LogLevel] variables.
type legacyLoggerAdapter struct{}

var _ LoggerWithLevel = (*legacyLoggerAdapter)(nil)

// structuredToPrintf converts a structured log message and key-value pairs into something a Printf-style logger can understand.
func (l *legacyLoggerAdapter) structuredToPrintf(msg string, v ...any) (string, []any) {
format := msg
var args []any

for i := 0; i < len(v); i += 2 {
if i+1 >= len(v) {
break
}
format += " %v=%v"
args = append(args, v[i], v[i+1])
}

return format, args
}

func (l legacyLoggerAdapter) Errorf(ctx context.Context, format string, v ...any) {
internal.Logger.Printf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) ErrorContext(ctx context.Context, msg string, args ...any) {
format, v := l.structuredToPrintf(msg, args...)
l.Errorf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) WarnContext(ctx context.Context, msg string, args ...any) {
format, v := l.structuredToPrintf(msg, args...)
l.Warnf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) Warnf(ctx context.Context, format string, v ...any) {
if !internal.LogLevel.WarnOrAbove() {
// Skip logging
return
}
internal.Logger.Printf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) InfoContext(ctx context.Context, msg string, args ...any) {
format, v := l.structuredToPrintf(msg, args...)
l.Infof(ctx, format, v...)
}

func (l *legacyLoggerAdapter) Infof(ctx context.Context, format string, v ...any) {
if !internal.LogLevel.InfoOrAbove() {
// Skip logging
return
}
internal.Logger.Printf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) DebugContext(ctx context.Context, msg string, args ...any) {
format, v := l.structuredToPrintf(msg, args...)
l.Debugf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) Debugf(ctx context.Context, format string, v ...any) {
if !internal.LogLevel.DebugOrAbove() {
// Skip logging
return
}
internal.Logger.Printf(ctx, format, v...)
}

func (l *legacyLoggerAdapter) Enabled(ctx context.Context, level LogLevelT) bool {
switch level {
case LogLevelDebug:
return internal.LogLevel.DebugOrAbove()
case LogLevelWarn:
return internal.LogLevel.WarnOrAbove()
case LogLevelInfo:
return internal.LogLevel.InfoOrAbove()
}
return true
}

var legacyLoggerWithLevel = &legacyLoggerAdapter{}
1 change: 1 addition & 0 deletions logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ func (l *filterLogger) Printf(ctx context.Context, format string, v ...interface
return
}
}

Loading
Loading