-
Notifications
You must be signed in to change notification settings - Fork 55
[PECOBLR-1147] Implement Client Manager for Per-Host Clients #305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
264abfc
e778c67
a3920e8
b19da8b
e537239
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| package telemetry | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "sync" | ||
| ) | ||
|
|
||
| // telemetryClient represents a client for sending telemetry data to Databricks. | ||
| // | ||
| // Thread-Safety and Sharing: | ||
| // - One telemetryClient instance is shared across ALL connections to the same host | ||
| // - This prevents rate limiting by consolidating telemetry from multiple connections | ||
| // - The client MUST be fully thread-safe as it will be accessed concurrently | ||
| // - All methods (start, close, and future export methods) must use proper synchronization | ||
| // | ||
| // The mu mutex protects the started and closed flags. Future implementations in Phase 4 | ||
| // will need to ensure thread-safety for batch operations and flushing. | ||
| // | ||
| // This is a minimal stub implementation that will be fully implemented in Phase 4. | ||
| type telemetryClient struct { | ||
| host string | ||
| httpClient *http.Client | ||
| cfg *Config | ||
| mu sync.Mutex // Protects started and closed flags | ||
| started bool | ||
| closed bool | ||
| } | ||
|
|
||
| // newTelemetryClient creates a new telemetry client for the given host. | ||
| func newTelemetryClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { | ||
| return &telemetryClient{ | ||
| host: host, | ||
| httpClient: httpClient, | ||
| cfg: cfg, | ||
| } | ||
| } | ||
|
|
||
| // start starts the telemetry client's background operations. | ||
| // This is a stub implementation that will be fully implemented in Phase 4. | ||
| func (c *telemetryClient) start() error { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.started = true | ||
| return nil | ||
| } | ||
|
|
||
| // close stops the telemetry client and flushes any pending data. | ||
| // This is a stub implementation that will be fully implemented in Phase 4. | ||
| func (c *telemetryClient) close() error { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| c.closed = true | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,110 @@ | ||
| package telemetry | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "sync" | ||
|
|
||
| "github.com/databricks/databricks-sql-go/logger" | ||
| ) | ||
|
|
||
| // clientManager manages one telemetry client per host. | ||
| // | ||
| // Design: | ||
| // - Creates a single telemetryClient per host, shared across multiple connections | ||
| // - Prevents rate limiting by consolidating telemetry from all connections to the same host | ||
| // - Uses reference counting to track active connections and cleanup when last connection closes | ||
| // - Thread-safe using sync.RWMutex for concurrent access from multiple goroutines | ||
| // | ||
| // The manager handles synchronization for client lifecycle (create/release), | ||
| // while the telemetryClient itself must be thread-safe for concurrent data operations. | ||
| type clientManager struct { | ||
| mu sync.RWMutex // Protects the clients map | ||
| clients map[string]*clientHolder // host -> client holder mapping | ||
| } | ||
|
|
||
| // clientHolder holds a telemetry client and its reference count. | ||
| type clientHolder struct { | ||
| client *telemetryClient | ||
| refCount int | ||
| } | ||
|
|
||
| var ( | ||
| managerOnce sync.Once | ||
| managerInstance *clientManager | ||
| ) | ||
|
|
||
| // getClientManager returns the singleton instance. | ||
| func getClientManager() *clientManager { | ||
| managerOnce.Do(func() { | ||
| managerInstance = &clientManager{ | ||
| clients: make(map[string]*clientHolder), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want to put a max size or any LRU kind of cache?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't really evict clients with refCount > 0 (they're actively in use). And we anyway clean this up if refCount reach 0. Let me know if you think otherwise. |
||
| } | ||
| }) | ||
| return managerInstance | ||
| } | ||
|
|
||
| // getOrCreateClient gets or creates a telemetry client for the host. | ||
| // Increments reference count. | ||
| func (m *clientManager) getOrCreateClient(host string, httpClient *http.Client, cfg *Config) *telemetryClient { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| holder, exists := m.clients[host] | ||
| if !exists { | ||
| client := newTelemetryClient(host, httpClient, cfg) | ||
| if err := client.start(); err != nil { | ||
| // Failed to start client, don't add to map | ||
| logger.Logger.Warn().Str("host", host).Err(err).Msg("failed to start telemetry client") | ||
| return nil | ||
| } | ||
| holder = &clientHolder{ | ||
| client: client, | ||
| } | ||
| m.clients[host] = holder | ||
samikshya-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| holder.refCount++ | ||
| return holder.client | ||
| } | ||
|
|
||
| // releaseClient decrements reference count for the host. | ||
| // Closes and removes client when ref count reaches zero. | ||
| func (m *clientManager) releaseClient(host string) error { | ||
| m.mu.Lock() | ||
| holder, exists := m.clients[host] | ||
| if !exists { | ||
| m.mu.Unlock() | ||
| return nil | ||
| } | ||
|
|
||
| holder.refCount-- | ||
| if holder.refCount < 0 { | ||
| // This should never happen - indicates a bug where releaseClient was called more than getOrCreateClient | ||
| logger.Logger.Debug().Str("host", host).Int("refCount", holder.refCount).Msg("telemetry client refCount became negative") | ||
| } | ||
| if holder.refCount <= 0 { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we log when this becomes negative, that will likely be a bug |
||
| delete(m.clients, host) | ||
| m.mu.Unlock() | ||
| return holder.client.close() // Close and flush | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting. This relies on the thread safety of the client and not the manager. I think this is an astute thought because close can be IO expensive because of last minute flushes and manager shouldn't be blocked. |
||
| } | ||
|
|
||
| m.mu.Unlock() | ||
samikshya-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return nil | ||
| } | ||
|
|
||
| // shutdown closes all telemetry clients and clears the manager. | ||
| // This should be called on application shutdown. | ||
| func (m *clientManager) shutdown() error { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
|
|
||
| var lastErr error | ||
| for host, holder := range m.clients { | ||
| if err := holder.client.close(); err != nil { | ||
| logger.Logger.Warn().Str("host", host).Err(err).Msg("error closing telemetry client during shutdown") | ||
| lastErr = err | ||
| } | ||
| } | ||
| // Clear the map | ||
| m.clients = make(map[string]*clientHolder) | ||
| return lastErr | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make this and below method thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping to do it in a later PR, but makes sense to do it right-away! done β
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to make it thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh so this can be shared by multiple connections because client is at host level. So the entire telemetry client has to fully thread safe. Would be really helpful if we document this upfront.