Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
63ee9fe
refactor: idle pool and fix non-deterministic teardown of idle cleanu…
GeorgeMac Oct 14, 2025
09bf7a4
fix(pool): close connection after draining
GeorgeMac Oct 14, 2025
5a7adc7
refactor(pool): peek at min entry to check expiration and avoid exces…
GeorgeMac Oct 15, 2025
d1b7e47
refactor(pool): adjust cleanup ticker duration based on pools minimum
GeorgeMac Oct 15, 2025
4e9d535
fix(pool): only skip or remove min when pool at capacity on Put
GeorgeMac Oct 15, 2025
e8faaa1
test(pool): update test to ensure idle capacity is assert properly
GeorgeMac Oct 15, 2025
4742580
fix(clickhouse): use atomic bool for closed flag
GeorgeMac Oct 15, 2025
d7d4d5f
fix(clickhouse): return context error instead of err acquire context …
GeorgeMac Oct 15, 2025
9a9ad4a
fix(tests): use context cancel cause to send specific dial timeout error
GeorgeMac Oct 15, 2025
88d5d16
refactor(pool): replace min heap with circular queue / buffer
GeorgeMac Oct 16, 2025
746e196
fix(pool): close bad conn on put and on release when closed
GeorgeMac Oct 16, 2025
66068f4
test: add syntest around connection open and close
GeorgeMac Oct 28, 2025
628c073
Merge branch 'main' into gm/idle-connection-cleanup-leak
kavirajk Oct 30, 2025
3703c45
Merge branch 'main' into gm/idle-connection-cleanup-leak
kavirajk Nov 17, 2025
b05ec25
refactor: rename idlePool to connPool
GeorgeMac Nov 20, 2025
2d621c4
refactor(circular): rename Compact to DeleteFunc
GeorgeMac Nov 20, 2025
b3d49b3
refactor: rename tests referring to idle pool to conn pool
GeorgeMac Nov 20, 2025
155cee1
test: ensure drain pool routine removes and closes expired conns
GeorgeMac Nov 20, 2025
6b2acda
fix(test): remove double call to wg.Done
GeorgeMac Nov 20, 2025
5e8ba58
refactor(circular): unexport Queue.All to Queue.all
GeorgeMac Nov 20, 2025
3727502
chore: add documentation to connection pool
GeorgeMac Nov 20, 2025
a9eb436
feat: add context cancel check to *clickhouse.dial
GeorgeMac Nov 20, 2025
cb9cc6f
test: add test around clickhouse acquire and release
GeorgeMac Nov 24, 2025
89111f6
refactor(pool): remove unnecessary closed check in Get body loop
GeorgeMac Nov 24, 2025
4db0968
refactor(pool): return non-nil error when pool is empty
GeorgeMac Nov 25, 2025
dc0eae1
Merge branch 'main' into gm/idle-connection-cleanup-leak
GeorgeMac Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 56 additions & 95 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -34,6 +35,7 @@ var (
ErrBindMixedParamsFormats = errors.New("clickhouse [bind]: mixed named, numeric or positional parameters")
ErrAcquireConnNoAddress = errors.New("clickhouse: no valid address supplied")
ErrServerUnexpectedData = errors.New("code: 101, message: Unexpected packet Data received from client")
ErrConnectionClosed = errors.New("clickhouse: connection is closed")
)

type OpError struct {
Expand Down Expand Up @@ -67,12 +69,13 @@ func Open(opt *Options) (driver.Conn, error) {
o := opt.setDefaults()

conn := &clickhouse{
opt: o,
idle: make(chan nativeTransport, o.MaxIdleConns),
open: make(chan struct{}, o.MaxOpenConns),
exit: make(chan struct{}),
opt: o,
idle: newConnPool(o.ConnMaxLifetime, o.MaxIdleConns),
open: make(chan struct{}, o.MaxOpenConns),
closeOnce: &sync.Once{},
closed: &atomic.Bool{},
}
go conn.startAutoCloseIdleConnections()

return conn, nil
}

Expand Down Expand Up @@ -101,10 +104,13 @@ type nativeTransportRelease func(nativeTransport, error)

type clickhouse struct {
opt *Options
idle chan nativeTransport
open chan struct{}
exit chan struct{}
connID int64

idle *connPool
open chan struct{}

closeOnce *sync.Once
closed *atomic.Bool
}

func (clickhouse) Contributors() []string {
Expand Down Expand Up @@ -226,13 +232,18 @@ func (ch *clickhouse) Ping(ctx context.Context) (err error) {
func (ch *clickhouse) Stats() driver.Stats {
return driver.Stats{
Open: len(ch.open),
Idle: len(ch.idle),
MaxOpenConns: cap(ch.open),
MaxIdleConns: cap(ch.idle),

Idle: ch.idle.Len(),
MaxIdleConns: ch.idle.Cap(),
}
}

func (ch *clickhouse) dial(ctx context.Context) (conn nativeTransport, err error) {
if err := ctx.Err(); err != nil {
return nil, err
}

connID := int(atomic.AddInt64(&ch.connID, 1))

dialFunc := func(ctx context.Context, addr string, opt *Options) (DialResult, error) {
Expand Down Expand Up @@ -286,87 +297,46 @@ func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dia
}

func (ch *clickhouse) acquire(ctx context.Context) (conn nativeTransport, err error) {
timer := time.NewTimer(ch.opt.DialTimeout)
defer timer.Stop()
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
if ch.closed.Load() {
return nil, ErrConnectionClosed
}

ctx, cancel := context.WithTimeoutCause(ctx, ch.opt.DialTimeout, ErrAcquireConnTimeout)
defer cancel()

select {
case <-timer.C:
return nil, ErrAcquireConnTimeout
case <-ctx.Done():
return nil, ctx.Err()
case ch.open <- struct{}{}:
case <-ctx.Done():
return nil, context.Cause(ctx)
}
select {
case <-timer.C:
select {
case <-ch.open:
default:
}
return nil, ErrAcquireConnTimeout
case conn := <-ch.idle:
if conn.isBad() {
conn.close()
if conn, err = ch.dial(ctx); err != nil {
select {
case <-ch.open:
default:
}
return nil, err
}

conn, err = ch.idle.Get(ctx)
if err != nil && !errors.Is(err, errQueueEmpty) {
return nil, err
}

if err == nil && conn != nil {
if !conn.isBad() {
conn.setReleased(false)
conn.debugf("[acquired from pool]")
return conn, nil
}
conn.setReleased(false)
conn.debugf("[acquired from pool]")
return conn, nil
default:

conn.close()
}

if conn, err = ch.dial(ctx); err != nil {
select {
case <-ch.open:
default:
}

return nil, err
}

conn.debugf("[acquired new]")
return conn, nil
}

func (ch *clickhouse) startAutoCloseIdleConnections() {
ticker := time.NewTicker(ch.opt.ConnMaxLifetime)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ch.closeIdleExpired()
case <-ch.exit:
return
}
}
}

func (ch *clickhouse) closeIdleExpired() {
cutoff := time.Now().Add(-ch.opt.ConnMaxLifetime)
for {
select {
case conn := <-ch.idle:
if conn.connectedAtTime().Before(cutoff) {
conn.close()
} else {
select {
case ch.idle <- conn:
default:
conn.close()
}
return
}
default:
return
}
}
}

func (ch *clickhouse) release(conn nativeTransport, err error) {
Expand Down Expand Up @@ -401,28 +371,19 @@ func (ch *clickhouse) release(conn nativeTransport, err error) {
conn.freeBuffer()
}

select {
case ch.idle <- conn:
default:
conn.debugf("[close: idle pool full %d/%d]", len(ch.idle), cap(ch.idle))
if ch.closed.Load() {
conn.close()
return
}

ch.idle.Put(conn)
}

func (ch *clickhouse) Close() error {
for {
select {
case conn := <-ch.idle:
conn.debugf("[close: closing pool]")
conn.close()
default:
// In rare cases, close may be called multiple times, don't block
//TODO: add proper close flag to indicate this pool is unusable after Close
select {
case ch.exit <- struct{}{}:
default:
}
return nil
}
}
func (ch *clickhouse) Close() (err error) {
ch.closeOnce.Do(func() {
err = ch.idle.Close()
ch.closed.Store(true)
})

return
}
Loading
Loading