diff --git a/telemetry/DESIGN.md b/telemetry/DESIGN.md index a9107c7..e1509c9 100644 --- a/telemetry/DESIGN.md +++ b/telemetry/DESIGN.md @@ -1580,7 +1580,42 @@ func (c *conn) Close() error { } ``` -### 9.2 Client Shutdown +### 9.2 Client Manager Shutdown + +The `clientManager` now includes a `shutdown()` method that provides graceful cleanup of all telemetry clients on application shutdown. This method: + +- Closes all active telemetry clients regardless of reference counts +- Logs warnings for any close failures +- Clears the clients map to prevent memory leaks +- Returns the last error encountered (if any) + +```go +// shutdown closes all telemetry clients and clears the manager. +// Integration points will be determined in Phase 4. +func (m *clientManager) shutdown() error { + m.mu.Lock() + defer m.mu.Unlock() + + var lastErr error + for host, holder := range m.clients { + if err := holder.client.close(); err != nil { + logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown") + lastErr = err + } + } + // Clear the map + m.clients = make(map[string]*clientHolder) + return lastErr +} +``` + +**Integration Options** (to be implemented in Phase 4): + +1. **Public API**: Export a `Shutdown()` function for applications to call during their shutdown sequence +2. **Driver Hook**: Integrate with `sql.DB.Close()` or driver cleanup mechanisms +3. **Signal Handler**: Call from application signal handlers (SIGTERM, SIGINT) + +### 9.3 Client Shutdown ```go // close shuts down the telemetry client gracefully. @@ -1742,11 +1777,25 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { - [x] Implement `tags.go` with tag definitions and filtering - [x] Add unit tests for configuration and tags -### Phase 2: Per-Host Management +### Phase 2: Per-Host Management ✅ COMPLETED - [x] Implement `featureflag.go` with caching and reference counting (PECOBLR-1146) -- [ ] Implement `manager.go` for client management -- [ ] Implement `circuitbreaker.go` with state machine -- [ ] Add unit tests for all components +- [x] Implement `manager.go` for client management (PECOBLR-1147) + - [x] Thread-safe singleton pattern with per-host client holders + - [x] Reference counting for automatic cleanup + - [x] Error handling for client start failures + - [x] Shutdown method for graceful application shutdown + - [x] Comprehensive documentation on thread-safety and connection sharing +- [x] Implement `client.go` with minimal telemetryClient stub (PECOBLR-1147) + - [x] Thread-safe start() and close() methods + - [x] Mutex protection for state flags + - [x] Detailed documentation on concurrent access requirements +- [x] Add comprehensive unit tests for all components (PECOBLR-1147) + - [x] Singleton pattern verification + - [x] Reference counting (increment/decrement/cleanup) + - [x] Concurrent access tests (100+ goroutines) + - [x] Shutdown scenarios (empty, with active refs, multiple hosts) + - [x] Race detector tests passing +- [ ] Implement `circuitbreaker.go` with state machine (PECOBLR-1148) ### Phase 3: Collection & Aggregation - [ ] Implement `interceptor.go` for metric collection @@ -1756,8 +1805,13 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { ### Phase 4: Export - [ ] Implement `exporter.go` with retry logic -- [ ] Implement `client.go` for telemetry client +- [ ] Implement `client.go` for telemetry client with full functionality - [ ] Wire up circuit breaker with exporter +- [ ] Integrate shutdown method into driver lifecycle: + - [ ] Option 1: Export public `Shutdown()` API for applications to call + - [ ] Option 2: Hook into `sql.DB.Close()` or driver cleanup + - [ ] Option 3: Integrate with connection pool shutdown logic + - [ ] Document shutdown integration points and usage patterns - [ ] Add unit tests for export logic ### Phase 5: Driver Integration diff --git a/telemetry/client.go b/telemetry/client.go new file mode 100644 index 0000000..f097406 --- /dev/null +++ b/telemetry/client.go @@ -0,0 +1,54 @@ +package telemetry + +import ( + "net/http" + "sync" +) + +// telemetryClient represents a client for sending telemetry data to Databricks. +// +// Thread-Safety and Sharing: +// - One telemetryClient instance is shared across ALL connections to the same host +// - This prevents rate limiting by consolidating telemetry from multiple connections +// - The client MUST be fully thread-safe as it will be accessed concurrently +// - All methods (start, close, and future export methods) must use proper synchronization +// +// The mu mutex protects the started and closed flags. Future implementations in Phase 4 +// will need to ensure thread-safety for batch operations and flushing. +// +// This is a minimal stub implementation that will be fully implemented in Phase 4. +type telemetryClient struct { + host string + httpClient *http.Client + cfg *Config + mu sync.Mutex // Protects started and closed flags + started bool + closed bool +} + +// newTelemetryClient creates a new telemetry client for the given host. +func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { + return &telemetryClient{ + host: host, + httpClient: httpClient, + cfg: cfg, + } +} + +// start starts the telemetry client's background operations. +// This is a stub implementation that will be fully implemented in Phase 4. +func (c *telemetryClient) start() error { + c.mu.Lock() + defer c.mu.Unlock() + c.started = true + return nil +} + +// close stops the telemetry client and flushes any pending data. +// This is a stub implementation that will be fully implemented in Phase 4. +func (c *telemetryClient) close() error { + c.mu.Lock() + defer c.mu.Unlock() + c.closed = true + return nil +} diff --git a/telemetry/manager.go b/telemetry/manager.go new file mode 100644 index 0000000..ebeb6af --- /dev/null +++ b/telemetry/manager.go @@ -0,0 +1,110 @@ +package telemetry + +import ( + "net/http" + "sync" + + "github.com/databricks/databricks-sql-go/logger" +) + +// clientManager manages one telemetry client per host. +// +// Design: +// - Creates a single telemetryClient per host, shared across multiple connections +// - Prevents rate limiting by consolidating telemetry from all connections to the same host +// - Uses reference counting to track active connections and cleanup when last connection closes +// - Thread-safe using sync.RWMutex for concurrent access from multiple goroutines +// +// The manager handles synchronization for client lifecycle (create/release), +// while the telemetryClient itself must be thread-safe for concurrent data operations. +type clientManager struct { + mu sync.RWMutex // Protects the clients map + clients map[string]*clientHolder // host -> client holder mapping +} + +// clientHolder holds a telemetry client and its reference count. +type clientHolder struct { + client *telemetryClient + refCount int +} + +var ( + managerOnce sync.Once + managerInstance *clientManager +) + +// getClientManager returns the singleton instance. +func getClientManager() *clientManager { + managerOnce.Do(func() { + managerInstance = &clientManager{ + clients: make(map[string]*clientHolder), + } + }) + return managerInstance +} + +// getOrCreateClient gets or creates a telemetry client for the host. +// Increments reference count. +func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { + m.mu.Lock() + defer m.mu.Unlock() + + holder, exists := m.clients[host] + if !exists { + client := newTelemetryClient(host, httpClient, cfg) + if err := client.start(); err != nil { + // Failed to start client, don't add to map + logger.Logger.Warn().Str("host", host).Err(err).Msg("failed to start telemetry client") + return nil + } + holder = &clientHolder{ + client: client, + } + m.clients[host] = holder + } + holder.refCount++ + return holder.client +} + +// releaseClient decrements reference count for the host. +// Closes and removes client when ref count reaches zero. +func (m *clientManager) releaseClient(host string) error { + m.mu.Lock() + holder, exists := m.clients[host] + if !exists { + m.mu.Unlock() + return nil + } + + holder.refCount-- + if holder.refCount < 0 { + // This should never happen - indicates a bug where releaseClient was called more than getOrCreateClient + logger.Logger.Debug().Str("host", host).Int("refCount", holder.refCount).Msg("telemetry client refCount became negative") + } + if holder.refCount <= 0 { + delete(m.clients, host) + m.mu.Unlock() + return holder.client.close() // Close and flush + } + + m.mu.Unlock() + return nil +} + +// shutdown closes all telemetry clients and clears the manager. +// This should be called on application shutdown. +func (m *clientManager) shutdown() error { + m.mu.Lock() + defer m.mu.Unlock() + + var lastErr error + for host, holder := range m.clients { + if err := holder.client.close(); err != nil { + logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown") + lastErr = err + } + } + // Clear the map + m.clients = make(map[string]*clientHolder) + return lastErr +} diff --git a/telemetry/manager_test.go b/telemetry/manager_test.go new file mode 100644 index 0000000..59127e2 --- /dev/null +++ b/telemetry/manager_test.go @@ -0,0 +1,419 @@ +package telemetry + +import ( + "net/http" + "sync" + "testing" +) + +func TestGetClientManager_Singleton(t *testing.T) { + // Reset singleton for testing + managerInstance = nil + managerOnce = sync.Once{} + + manager1 := getClientManager() + manager2 := getClientManager() + + if manager1 != manager2 { + t.Error("Expected singleton instances to be the same") + } +} + +func TestClientManager_GetOrCreateClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // First call should create client and increment refCount to 1 + client1 := manager.getOrCreateClient(host, httpClient, cfg) + if client1 == nil { + t.Fatal("Expected client to be created") + } + + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to exist in clients map") + } + if holder.refCount != 1 { + t.Errorf("Expected refCount to be 1, got %d", holder.refCount) + } + if !client1.started { + t.Error("Expected client to be started") + } + + // Second call should reuse client and increment refCount to 2 + client2 := manager.getOrCreateClient(host, httpClient, cfg) + if client2 != client1 { + t.Error("Expected to get the same client instance") + } + if holder.refCount != 2 { + t.Errorf("Expected refCount to be 2, got %d", holder.refCount) + } +} + +func TestClientManager_GetOrCreateClient_DifferentHosts(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host1 := "host1.databricks.com" + host2 := "host2.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client1 := manager.getOrCreateClient(host1, httpClient, cfg) + client2 := manager.getOrCreateClient(host2, httpClient, cfg) + + if client1 == client2 { + t.Error("Expected different clients for different hosts") + } + + if len(manager.clients) != 2 { + t.Errorf("Expected 2 clients in manager, got %d", len(manager.clients)) + } +} + +func TestClientManager_ReleaseClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create client with refCount = 2 + manager.getOrCreateClient(host, httpClient, cfg) + manager.getOrCreateClient(host, httpClient, cfg) + + // First release should decrement to 1 + err := manager.releaseClient(host) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to still exist") + } + if holder.refCount != 1 { + t.Errorf("Expected refCount to be 1, got %d", holder.refCount) + } + if holder.client.closed { + t.Error("Expected client not to be closed yet") + } + + // Second release should remove client + err = manager.releaseClient(host) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + _, exists = manager.clients[host] + if exists { + t.Error("Expected holder to be removed when refCount reaches 0") + } + if !holder.client.closed { + t.Error("Expected client to be closed when removed") + } +} + +func TestClientManager_ReleaseClient_NonExistent(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + // Release non-existent host should not panic or error + err := manager.releaseClient("non-existent-host.databricks.com") + if err != nil { + t.Errorf("Expected no error for non-existent host, got %v", err) + } +} + +func TestClientManager_ConcurrentAccess(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + numGoroutines := 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Concurrent getOrCreateClient + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + client := manager.getOrCreateClient(host, httpClient, cfg) + if client == nil { + t.Error("Expected client to be created") + } + }() + } + wg.Wait() + + // Verify refCount + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to exist") + } + if holder.refCount != numGoroutines { + t.Errorf("Expected refCount to be %d, got %d", numGoroutines, holder.refCount) + } + + // Concurrent releaseClient + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + _ = manager.releaseClient(host) + }() + } + wg.Wait() + + // Verify client is removed + _, exists = manager.clients[host] + if exists { + t.Error("Expected holder to be removed after all releases") + } +} + +func TestClientManager_ConcurrentAccessMultipleHosts(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + hosts := []string{ + "host1.databricks.com", + "host2.databricks.com", + "host3.databricks.com", + } + httpClient := &http.Client{} + cfg := DefaultConfig() + numGoroutinesPerHost := 50 + + var wg sync.WaitGroup + + // Concurrent access to multiple hosts + for _, host := range hosts { + for i := 0; i < numGoroutinesPerHost; i++ { + wg.Add(1) + go func(h string) { + defer wg.Done() + _ = manager.getOrCreateClient(h, httpClient, cfg) + }(host) + } + } + wg.Wait() + + // Verify all hosts have correct refCount + if len(manager.clients) != len(hosts) { + t.Errorf("Expected %d clients, got %d", len(hosts), len(manager.clients)) + } + + for _, host := range hosts { + holder, exists := manager.clients[host] + if !exists { + t.Errorf("Expected holder for host %s to exist", host) + continue + } + if holder.refCount != numGoroutinesPerHost { + t.Errorf("Expected refCount for host %s to be %d, got %d", host, numGoroutinesPerHost, holder.refCount) + } + } +} + +func TestClientManager_ReleaseClientPartial(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create 5 references + for i := 0; i < 5; i++ { + manager.getOrCreateClient(host, httpClient, cfg) + } + + // Release 3 references + for i := 0; i < 3; i++ { + _ = manager.releaseClient(host) + } + + // Should still have 2 references + holder, exists := manager.clients[host] + if !exists { + t.Fatal("Expected holder to still exist") + } + if holder.refCount != 2 { + t.Errorf("Expected refCount to be 2, got %d", holder.refCount) + } + if holder.client.closed { + t.Error("Expected client not to be closed with remaining references") + } +} + +func TestClientManager_ClientStartCalled(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client := manager.getOrCreateClient(host, httpClient, cfg) + + if !client.started { + t.Error("Expected start() to be called on new client") + } +} + +func TestClientManager_ClientCloseCalled(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + client := manager.getOrCreateClient(host, httpClient, cfg) + _ = manager.releaseClient(host) + + if !client.closed { + t.Error("Expected close() to be called when client is removed") + } +} + +func TestClientManager_MultipleGetOrCreateSameClient(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Get same client multiple times + client1 := manager.getOrCreateClient(host, httpClient, cfg) + client2 := manager.getOrCreateClient(host, httpClient, cfg) + client3 := manager.getOrCreateClient(host, httpClient, cfg) + + // All should be same instance + if client1 != client2 || client2 != client3 { + t.Error("Expected all calls to return the same client instance") + } + + // Verify refCount is 3 + holder := manager.clients[host] + if holder.refCount != 3 { + t.Errorf("Expected refCount to be 3, got %d", holder.refCount) + } +} + +func TestClientManager_Shutdown(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + hosts := []string{ + "host1.databricks.com", + "host2.databricks.com", + "host3.databricks.com", + } + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create clients for multiple hosts + clients := make([]*telemetryClient, 0, len(hosts)) + for _, host := range hosts { + client := manager.getOrCreateClient(host, httpClient, cfg) + clients = append(clients, client) + } + + // Verify clients are created + if len(manager.clients) != len(hosts) { + t.Errorf("Expected %d clients, got %d", len(hosts), len(manager.clients)) + } + + // Shutdown should close all clients + err := manager.shutdown() + if err != nil { + t.Errorf("Expected no error during shutdown, got %v", err) + } + + // Verify all clients are closed + for i, client := range clients { + if !client.closed { + t.Errorf("Expected client for host %s to be closed", hosts[i]) + } + } + + // Verify clients map is empty + if len(manager.clients) != 0 { + t.Errorf("Expected clients map to be empty after shutdown, got %d clients", len(manager.clients)) + } +} + +func TestClientManager_ShutdownWithActiveRefs(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + host := "test-host.databricks.com" + httpClient := &http.Client{} + cfg := DefaultConfig() + + // Create client with multiple references + client := manager.getOrCreateClient(host, httpClient, cfg) + manager.getOrCreateClient(host, httpClient, cfg) + manager.getOrCreateClient(host, httpClient, cfg) + + holder := manager.clients[host] + if holder.refCount != 3 { + t.Errorf("Expected refCount to be 3, got %d", holder.refCount) + } + + // Shutdown should still close client even with active references + err := manager.shutdown() + if err != nil { + t.Errorf("Expected no error during shutdown, got %v", err) + } + + // Verify client is closed + if !client.closed { + t.Error("Expected client to be closed after shutdown") + } + + // Verify clients map is empty + if len(manager.clients) != 0 { + t.Errorf("Expected clients map to be empty after shutdown, got %d clients", len(manager.clients)) + } +} + +func TestClientManager_ShutdownEmptyManager(t *testing.T) { + manager := &clientManager{ + clients: make(map[string]*clientHolder), + } + + // Shutdown on empty manager should not error + err := manager.shutdown() + if err != nil { + t.Errorf("Expected no error shutting down empty manager, got %v", err) + } + + // Verify map is still empty + if len(manager.clients) != 0 { + t.Errorf("Expected clients map to be empty, got %d clients", len(manager.clients)) + } +}