Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,42 @@ func (c *conn) Close() error {
}
```

### 9.2 Client Shutdown
### 9.2 Client Manager Shutdown

The `clientManager` now includes a `shutdown()` method that provides graceful cleanup of all telemetry clients on application shutdown. This method:

- Closes all active telemetry clients regardless of reference counts
- Logs warnings for any close failures
- Clears the clients map to prevent memory leaks
- Returns the last error encountered (if any)

```go
// shutdown closes all telemetry clients and clears the manager.
// Integration points will be determined in Phase 4.
func (m *clientManager) shutdown() error {
m.mu.Lock()
defer m.mu.Unlock()

var lastErr error
for host, holder := range m.clients {
if err := holder.client.close(); err != nil {
logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown")
lastErr = err
}
}
// Clear the map
m.clients = make(map[string]*clientHolder)
return lastErr
}
```

**Integration Options** (to be implemented in Phase 4):

1. **Public API**: Export a `Shutdown()` function for applications to call during their shutdown sequence
2. **Driver Hook**: Integrate with `sql.DB.Close()` or driver cleanup mechanisms
3. **Signal Handler**: Call from application signal handlers (SIGTERM, SIGINT)

### 9.3 Client Shutdown

```go
// close shuts down the telemetry client gracefully.
Expand Down Expand Up @@ -1742,11 +1777,25 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Implement `tags.go` with tag definitions and filtering
- [x] Add unit tests for configuration and tags

### Phase 2: Per-Host Management
### Phase 2: Per-Host Management βœ… COMPLETED
- [x] Implement `featureflag.go` with caching and reference counting (PECOBLR-1146)
- [ ] Implement `manager.go` for client management
- [ ] Implement `circuitbreaker.go` with state machine
- [ ] Add unit tests for all components
- [x] Implement `manager.go` for client management (PECOBLR-1147)
- [x] Thread-safe singleton pattern with per-host client holders
- [x] Reference counting for automatic cleanup
- [x] Error handling for client start failures
- [x] Shutdown method for graceful application shutdown
- [x] Comprehensive documentation on thread-safety and connection sharing
- [x] Implement `client.go` with minimal telemetryClient stub (PECOBLR-1147)
- [x] Thread-safe start() and close() methods
- [x] Mutex protection for state flags
- [x] Detailed documentation on concurrent access requirements
- [x] Add comprehensive unit tests for all components (PECOBLR-1147)
- [x] Singleton pattern verification
- [x] Reference counting (increment/decrement/cleanup)
- [x] Concurrent access tests (100+ goroutines)
- [x] Shutdown scenarios (empty, with active refs, multiple hosts)
- [x] Race detector tests passing
- [ ] Implement `circuitbreaker.go` with state machine (PECOBLR-1148)

### Phase 3: Collection & Aggregation
- [ ] Implement `interceptor.go` for metric collection
Expand All @@ -1756,8 +1805,13 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {

### Phase 4: Export
- [ ] Implement `exporter.go` with retry logic
- [ ] Implement `client.go` for telemetry client
- [ ] Implement `client.go` for telemetry client with full functionality
- [ ] Wire up circuit breaker with exporter
- [ ] Integrate shutdown method into driver lifecycle:
- [ ] Option 1: Export public `Shutdown()` API for applications to call
- [ ] Option 2: Hook into `sql.DB.Close()` or driver cleanup
- [ ] Option 3: Integrate with connection pool shutdown logic
- [ ] Document shutdown integration points and usage patterns
- [ ] Add unit tests for export logic

### Phase 5: Driver Integration
Expand Down
54 changes: 54 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package telemetry

import (
"net/http"
"sync"
)

// telemetryClient represents a client for sending telemetry data to Databricks.
//
// Thread-Safety and Sharing:
// - One telemetryClient instance is shared across ALL connections to the same host
// - This prevents rate limiting by consolidating telemetry from multiple connections
// - The client MUST be fully thread-safe as it will be accessed concurrently
// - All methods (start, close, and future export methods) must use proper synchronization
//
// The mu mutex protects the started and closed flags. Future implementations in Phase 4
// will need to ensure thread-safety for batch operations and flushing.
//
// This is a minimal stub implementation that will be fully implemented in Phase 4.
type telemetryClient struct {
host string
httpClient *http.Client
cfg *Config
mu sync.Mutex // Protects started and closed flags
started bool
closed bool
}

// newTelemetryClient creates a new telemetry client for the given host.
func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient {
return &telemetryClient{
host: host,
httpClient: httpClient,
cfg: cfg,
}
}

// start starts the telemetry client's background operations.
// This is a stub implementation that will be fully implemented in Phase 4.
func (c *telemetryClient) start() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to make this and below method thread safe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to do it in a later PR, but makes sense to do it right-away! done βœ…

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make it thread safe?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so this can be shared by multiple connections because client is at host level. So the entire telemetry client has to fully thread safe. Would be really helpful if we document this upfront.

c.mu.Lock()
defer c.mu.Unlock()
c.started = true
return nil
}

// close stops the telemetry client and flushes any pending data.
// This is a stub implementation that will be fully implemented in Phase 4.
func (c *telemetryClient) close() error {
c.mu.Lock()
defer c.mu.Unlock()
c.closed = true
return nil
}
110 changes: 110 additions & 0 deletions telemetry/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package telemetry

import (
"net/http"
"sync"

"github.com/databricks/databricks-sql-go/logger"
)

// clientManager manages one telemetry client per host.
//
// Design:
// - Creates a single telemetryClient per host, shared across multiple connections
// - Prevents rate limiting by consolidating telemetry from all connections to the same host
// - Uses reference counting to track active connections and cleanup when last connection closes
// - Thread-safe using sync.RWMutex for concurrent access from multiple goroutines
//
// The manager handles synchronization for client lifecycle (create/release),
// while the telemetryClient itself must be thread-safe for concurrent data operations.
type clientManager struct {
mu sync.RWMutex // Protects the clients map
clients map[string]*clientHolder // host -> client holder mapping
}

// clientHolder holds a telemetry client and its reference count.
type clientHolder struct {
client *telemetryClient
refCount int
}

var (
managerOnce sync.Once
managerInstance *clientManager
)

// getClientManager returns the singleton instance.
func getClientManager() *clientManager {
managerOnce.Do(func() {
managerInstance = &clientManager{
clients: make(map[string]*clientHolder),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to put a max size or any LRU kind of cache?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't really evict clients with refCount > 0 (they're actively in use). And we anyway clean this up if refCount reach 0. Let me know if you think otherwise.

}
})
return managerInstance
}

// getOrCreateClient gets or creates a telemetry client for the host.
// Increments reference count.
func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient {
m.mu.Lock()
defer m.mu.Unlock()

holder, exists := m.clients[host]
if !exists {
client := newTelemetryClient(host, httpClient, cfg)
if err := client.start(); err != nil {
// Failed to start client, don't add to map
logger.Logger.Warn().Str("host", host).Err(err).Msg("failed to start telemetry client")
return nil
}
holder = &clientHolder{
client: client,
}
m.clients[host] = holder
}
holder.refCount++
return holder.client
}

// releaseClient decrements reference count for the host.
// Closes and removes client when ref count reaches zero.
func (m *clientManager) releaseClient(host string) error {
m.mu.Lock()
holder, exists := m.clients[host]
if !exists {
m.mu.Unlock()
return nil
}

holder.refCount--
if holder.refCount < 0 {
// This should never happen - indicates a bug where releaseClient was called more than getOrCreateClient
logger.Logger.Debug().Str("host", host).Int("refCount", holder.refCount).Msg("telemetry client refCount became negative")
}
if holder.refCount <= 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log when this becomes negative, that will likely be a bug

delete(m.clients, host)
m.mu.Unlock()
return holder.client.close() // Close and flush

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. This relies on the thread safety of the client and not the manager. I think this is an astute thought because close can be IO expensive because of last minute flushes and manager shouldn't be blocked.

}

m.mu.Unlock()
return nil
}

// shutdown closes all telemetry clients and clears the manager.
// This should be called on application shutdown.
func (m *clientManager) shutdown() error {
m.mu.Lock()
defer m.mu.Unlock()

var lastErr error
for host, holder := range m.clients {
if err := holder.client.close(); err != nil {
logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown")
lastErr = err
}
}
// Clear the map
m.clients = make(map[string]*clientHolder)
return lastErr
}
Loading
Loading