- 
                Notifications
    You must be signed in to change notification settings 
- Fork 242
feat: add telemetry scheduler #1107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 40 commits
8a50583
              0c154b8
              2ccb148
              de5b70a
              fc93d2c
              3c7498e
              0a406ba
              6f9c638
              bf26d59
              c7205ca
              8c8a4bd
              4228142
              803349d
              51f0373
              58a7b02
              1ed184b
              2d66573
              7a2f452
              a146a1e
              871ade0
              4c15f3a
              b2c7dc4
              9d368d4
              d2b2a6f
              a357417
              992feaf
              bc5d4b8
              f89d05d
              ec10979
              17d1a15
              d206afe
              06fd3f8
              193c8ad
              87ce064
              3f2b23c
              2b718b0
              95106d8
              d61df59
              6be99a3
              e18a54d
              37e1bad
              9ae389d
              c654832
              ceef544
              b8d15d6
              43b7812
              7c87bb6
              b1bfe59
              e0478a2
              ea10ecb
              cda84db
              e2badbb
              3e60131
              885d057
              fc5f97c
              1f821a2
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -13,3 +13,4 @@ coverage: | |
| threshold: 0.5% | ||
| ignore: | ||
| - "log_fallback.go" | ||
| - "internal/testutils" | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -15,6 +15,10 @@ import ( | |
|  | ||
| "github.com/getsentry/sentry-go/internal/debug" | ||
| "github.com/getsentry/sentry-go/internal/debuglog" | ||
| httpInternal "github.com/getsentry/sentry-go/internal/http" | ||
| "github.com/getsentry/sentry-go/internal/protocol" | ||
| "github.com/getsentry/sentry-go/internal/ratelimit" | ||
| "github.com/getsentry/sentry-go/internal/telemetry" | ||
| ) | ||
|  | ||
| // The identifier of the SDK. | ||
|  | @@ -249,6 +253,8 @@ type ClientOptions struct { | |
| // | ||
| // By default, this is empty and all status codes are traced. | ||
| TraceIgnoreStatusCodes [][]int | ||
| // EnableTelemetryBuffer enables the telemetry buffer layer for prioritized delivery of events. | ||
| EnableTelemetryBuffer bool | ||
| } | ||
|  | ||
| // Client is the underlying processor that is used by the main API and Hub | ||
|  | @@ -263,8 +269,10 @@ type Client struct { | |
| sdkVersion string | ||
| // Transport is read-only. Replacing the transport of an existing client is | ||
| // not supported, create a new client instead. | ||
| Transport Transport | ||
| batchLogger *BatchLogger | ||
| Transport Transport | ||
| batchLogger *BatchLogger | ||
| telemetryBuffers map[ratelimit.Category]*telemetry.Buffer[protocol.EnvelopeItemConvertible] | ||
| telemetryScheduler *telemetry.Scheduler | ||
| } | ||
|  | ||
| // NewClient creates and returns an instance of Client configured using | ||
|  | @@ -364,12 +372,15 @@ func NewClient(options ClientOptions) (*Client, error) { | |
| sdkVersion: SDKVersion, | ||
| } | ||
|  | ||
| if options.EnableLogs { | ||
| client.setupTransport() | ||
|  | ||
| if options.EnableTelemetryBuffer { | ||
| client.setupTelemetryBuffer() | ||
| } else if options.EnableLogs { | ||
| client.batchLogger = NewBatchLogger(&client) | ||
| client.batchLogger.Start() | ||
| } | ||
|  | ||
| client.setupTransport() | ||
| client.setupIntegrations() | ||
|  | ||
| return &client, nil | ||
|  | @@ -391,6 +402,37 @@ func (client *Client) setupTransport() { | |
| client.Transport = transport | ||
| } | ||
|  | ||
| func (client *Client) setupTelemetryBuffer() { | ||
| if !client.options.EnableTelemetryBuffer { | ||
| return | ||
| } | ||
|  | ||
| if client.dsn == nil { | ||
| debuglog.Println("Telemetry buffer disabled: no DSN configured") | ||
| return | ||
| } | ||
|  | ||
| transport := httpInternal.NewAsyncTransport(httpInternal.TransportOptions{ | ||
| Dsn: client.options.Dsn, | ||
| HTTPClient: client.options.HTTPClient, | ||
| HTTPTransport: client.options.HTTPTransport, | ||
| HTTPProxy: client.options.HTTPProxy, | ||
| HTTPSProxy: client.options.HTTPSProxy, | ||
| CaCerts: client.options.CaCerts, | ||
| }) | ||
| client.Transport = &internalAsyncTransportAdapter{transport: transport} | ||
|  | ||
| client.telemetryBuffers = map[ratelimit.Category]*telemetry.Buffer[protocol.EnvelopeItemConvertible]{ | ||
| ratelimit.CategoryError: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0), | ||
| ratelimit.CategoryTransaction: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0), | ||
| ratelimit.CategoryLog: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), | ||
| ratelimit.CategoryMonitor: telemetry.NewBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0), | ||
| } | ||
|  | ||
| client.telemetryScheduler = telemetry.NewScheduler(client.telemetryBuffers, transport, &client.dsn.Dsn) | ||
| client.telemetryScheduler.Start() | ||
| } | ||
|  | ||
| func (client *Client) setupIntegrations() { | ||
| integrations := []Integration{ | ||
| new(contextifyFramesIntegration), | ||
|  | @@ -531,7 +573,7 @@ func (client *Client) RecoverWithContext( | |
| // the network synchronously, configure it to use the HTTPSyncTransport in the | ||
| // call to Init. | ||
| func (client *Client) Flush(timeout time.Duration) bool { | ||
| if client.batchLogger != nil { | ||
| if client.batchLogger != nil || client.telemetryScheduler != nil { | ||
| ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||
| defer cancel() | ||
| return client.FlushWithContext(ctx) | ||
|  | @@ -555,6 +597,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool { | |
| if client.batchLogger != nil { | ||
| client.batchLogger.Flush(ctx.Done()) | ||
| } | ||
| if client.telemetryScheduler != nil { | ||
| client.telemetryScheduler.FlushWithContext(ctx) | ||
| } | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Batch Logger Flush IgnoredIn FlushWithContext, when client.batchLogger is not nil, the method calls Flush on it but ignores the return value and doesn't return early. This causes the method to fall through and potentially call Transport.FlushWithContext, losing the result of batchLogger.Flush(). If batchLogger.Flush() returns false (indicating timeout), this result will be ignored, and the method could incorrectly return true based on a subsequent Transport.FlushWithContext() call. The batchLogger flush result should be returned or at least considered. | ||
| return client.Transport.FlushWithContext(ctx) | ||
| } | ||
|  | ||
|  | @@ -563,6 +608,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool { | |
| // Close should be called after Flush and before terminating the program | ||
| // otherwise some events may be lost. | ||
| func (client *Client) Close() { | ||
| if client.telemetryScheduler != nil { | ||
| client.telemetryScheduler.Stop(5 * time.Second) | ||
| } | ||
| client.Transport.Close() | ||
| } | ||
|  | ||
|  | @@ -683,7 +731,19 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod | |
| } | ||
| } | ||
|  | ||
| client.Transport.SendEvent(event) | ||
| if client.telemetryScheduler != nil { | ||
| category := event.toCategory() | ||
| if buffer, ok := client.telemetryBuffers[category]; ok { | ||
| buffer.Offer(event) | ||
| client.telemetryScheduler.Signal() | ||
| } else { | ||
| // fallback if we get an event type with unknown category. this shouldn't happen | ||
| debuglog.Printf("Unknown category for event type %s, sending directly", event.Type) | ||
| client.Transport.SendEvent(event) | ||
| } | ||
| } else { | ||
| client.Transport.SendEvent(event) | ||
| } | ||
|  | ||
| return &event.EventID | ||
| } | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -1,233 +1,37 @@ | ||
| package sentry | ||
|  | ||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/url" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
| "github.com/getsentry/sentry-go/internal/protocol" | ||
| ) | ||
|  | ||
| type scheme string | ||
| // Re-export protocol types to maintain public API compatibility | ||
|  | ||
| const ( | ||
| schemeHTTP scheme = "http" | ||
| schemeHTTPS scheme = "https" | ||
| ) | ||
|  | ||
| func (scheme scheme) defaultPort() int { | ||
| switch scheme { | ||
| case schemeHTTPS: | ||
| return 443 | ||
| case schemeHTTP: | ||
| return 80 | ||
| default: | ||
| return 80 | ||
| } | ||
| // Dsn is used as the remote address source to client transport. | ||
| type Dsn struct { | ||
| protocol.Dsn | ||
| } | ||
|  | ||
| // DsnParseError represents an error that occurs if a Sentry | ||
| // DSN cannot be parsed. | ||
| type DsnParseError struct { | ||
| Message string | ||
| } | ||
|  | ||
| func (e DsnParseError) Error() string { | ||
| return "[Sentry] DsnParseError: " + e.Message | ||
| } | ||
|  | ||
| // Dsn is used as the remote address source to client transport. | ||
| type Dsn struct { | ||
| scheme scheme | ||
| publicKey string | ||
| secretKey string | ||
| host string | ||
| port int | ||
| path string | ||
| projectID string | ||
| } | ||
| type DsnParseError = protocol.DsnParseError | ||
|  | ||
| // NewDsn creates a Dsn by parsing rawURL. Most users will never call this | ||
| // function directly. It is provided for use in custom Transport | ||
| // implementations. | ||
| func NewDsn(rawURL string) (*Dsn, error) { | ||
| // Parse | ||
| parsedURL, err := url.Parse(rawURL) | ||
| protocolDsn, err := protocol.NewDsn(rawURL) | ||
| if err != nil { | ||
| return nil, &DsnParseError{fmt.Sprintf("invalid url: %v", err)} | ||
| } | ||
|  | ||
| // Scheme | ||
| var scheme scheme | ||
| switch parsedURL.Scheme { | ||
| case "http": | ||
| scheme = schemeHTTP | ||
| case "https": | ||
| scheme = schemeHTTPS | ||
| default: | ||
| return nil, &DsnParseError{"invalid scheme"} | ||
| } | ||
|  | ||
| // PublicKey | ||
| publicKey := parsedURL.User.Username() | ||
| if publicKey == "" { | ||
| return nil, &DsnParseError{"empty username"} | ||
| } | ||
|  | ||
| // SecretKey | ||
| var secretKey string | ||
| if parsedSecretKey, ok := parsedURL.User.Password(); ok { | ||
| secretKey = parsedSecretKey | ||
| } | ||
|  | ||
| // Host | ||
| host := parsedURL.Hostname() | ||
| if host == "" { | ||
| return nil, &DsnParseError{"empty host"} | ||
| } | ||
|  | ||
| // Port | ||
| var port int | ||
| if p := parsedURL.Port(); p != "" { | ||
| port, err = strconv.Atoi(p) | ||
| if err != nil { | ||
| return nil, &DsnParseError{"invalid port"} | ||
| } | ||
| } else { | ||
| port = scheme.defaultPort() | ||
| } | ||
|  | ||
| // ProjectID | ||
| if parsedURL.Path == "" || parsedURL.Path == "/" { | ||
| return nil, &DsnParseError{"empty project id"} | ||
| } | ||
| pathSegments := strings.Split(parsedURL.Path[1:], "/") | ||
| projectID := pathSegments[len(pathSegments)-1] | ||
|  | ||
| if projectID == "" { | ||
| return nil, &DsnParseError{"empty project id"} | ||
| } | ||
|  | ||
| // Path | ||
| var path string | ||
| if len(pathSegments) > 1 { | ||
| path = "/" + strings.Join(pathSegments[0:len(pathSegments)-1], "/") | ||
| } | ||
|  | ||
| return &Dsn{ | ||
| scheme: scheme, | ||
| publicKey: publicKey, | ||
| secretKey: secretKey, | ||
| host: host, | ||
| port: port, | ||
| path: path, | ||
| projectID: projectID, | ||
| }, nil | ||
| } | ||
|  | ||
| // String formats Dsn struct into a valid string url. | ||
| func (dsn Dsn) String() string { | ||
| var url string | ||
| url += fmt.Sprintf("%s://%s", dsn.scheme, dsn.publicKey) | ||
| if dsn.secretKey != "" { | ||
| url += fmt.Sprintf(":%s", dsn.secretKey) | ||
| } | ||
| url += fmt.Sprintf("@%s", dsn.host) | ||
| if dsn.port != dsn.scheme.defaultPort() { | ||
| url += fmt.Sprintf(":%d", dsn.port) | ||
| return nil, err | ||
| } | ||
| if dsn.path != "" { | ||
| url += dsn.path | ||
| } | ||
| url += fmt.Sprintf("/%s", dsn.projectID) | ||
| return url | ||
| } | ||
|  | ||
| // Get the scheme of the DSN. | ||
| func (dsn Dsn) GetScheme() string { | ||
| return string(dsn.scheme) | ||
| } | ||
|  | ||
| // Get the public key of the DSN. | ||
| func (dsn Dsn) GetPublicKey() string { | ||
| return dsn.publicKey | ||
| } | ||
|  | ||
| // Get the secret key of the DSN. | ||
| func (dsn Dsn) GetSecretKey() string { | ||
| return dsn.secretKey | ||
| } | ||
|  | ||
| // Get the host of the DSN. | ||
| func (dsn Dsn) GetHost() string { | ||
| return dsn.host | ||
| } | ||
|  | ||
| // Get the port of the DSN. | ||
| func (dsn Dsn) GetPort() int { | ||
| return dsn.port | ||
| } | ||
|  | ||
| // Get the path of the DSN. | ||
| func (dsn Dsn) GetPath() string { | ||
| return dsn.path | ||
| return &Dsn{Dsn: *protocolDsn}, nil | ||
| } | ||
|  | ||
| // Get the project ID of the DSN. | ||
| func (dsn Dsn) GetProjectID() string { | ||
| return dsn.projectID | ||
| } | ||
|  | ||
| // GetAPIURL returns the URL of the envelope endpoint of the project | ||
| // associated with the DSN. | ||
| func (dsn Dsn) GetAPIURL() *url.URL { | ||
| var rawURL string | ||
| rawURL += fmt.Sprintf("%s://%s", dsn.scheme, dsn.host) | ||
| if dsn.port != dsn.scheme.defaultPort() { | ||
| rawURL += fmt.Sprintf(":%d", dsn.port) | ||
| } | ||
| if dsn.path != "" { | ||
| rawURL += dsn.path | ||
| } | ||
| rawURL += fmt.Sprintf("/api/%s/%s/", dsn.projectID, "envelope") | ||
| parsedURL, _ := url.Parse(rawURL) | ||
| return parsedURL | ||
| } | ||
|  | ||
| // RequestHeaders returns all the necessary headers that have to be used in the transport when seinding events | ||
| // RequestHeaders returns all the necessary headers that have to be used in the transport when sending events | ||
| // to the /store endpoint. | ||
| // | ||
| // Deprecated: This method shall only be used if you want to implement your own transport that sends events to | ||
| // the /store endpoint. If you're using the transport provided by the SDK, all necessary headers to authenticate | ||
| // against the /envelope endpoint are added automatically. | ||
| func (dsn Dsn) RequestHeaders() map[string]string { | ||
| auth := fmt.Sprintf("Sentry sentry_version=%s, sentry_timestamp=%d, "+ | ||
| "sentry_client=sentry.go/%s, sentry_key=%s", apiVersion, time.Now().Unix(), SDKVersion, dsn.publicKey) | ||
|  | ||
| if dsn.secretKey != "" { | ||
| auth = fmt.Sprintf("%s, sentry_secret=%s", auth, dsn.secretKey) | ||
| } | ||
|  | ||
| return map[string]string{ | ||
| "Content-Type": "application/json", | ||
| "X-Sentry-Auth": auth, | ||
| } | ||
| } | ||
|  | ||
| // MarshalJSON converts the Dsn struct to JSON. | ||
| func (dsn Dsn) MarshalJSON() ([]byte, error) { | ||
| return json.Marshal(dsn.String()) | ||
| } | ||
|  | ||
| // UnmarshalJSON converts JSON data to the Dsn struct. | ||
| func (dsn *Dsn) UnmarshalJSON(data []byte) error { | ||
| var str string | ||
| _ = json.Unmarshal(data, &str) | ||
| newDsn, err := NewDsn(str) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| *dsn = *newDsn | ||
| return nil | ||
| return dsn.Dsn.RequestHeaders(SDKVersion) | ||
| } | 
Uh oh!
There was an error while loading. Please reload this page.