diff --git a/.codecov.yml b/.codecov.yml index 0acb2e27f..0c0e695f2 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -16,3 +16,4 @@ coverage: informational: true ignore: - "log_fallback.go" + - "internal/testutils" diff --git a/client.go b/client.go index 346230223..73a04bb72 100644 --- a/client.go +++ b/client.go @@ -15,6 +15,10 @@ import ( "github.com/getsentry/sentry-go/internal/debug" "github.com/getsentry/sentry-go/internal/debuglog" + httpInternal "github.com/getsentry/sentry-go/internal/http" + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/internal/telemetry" ) // The identifier of the SDK. @@ -249,6 +253,8 @@ type ClientOptions struct { // // By default, this is empty and all status codes are traced. TraceIgnoreStatusCodes [][]int + // EnableTelemetryBuffer enables the telemetry buffer layer for prioritized delivery of events. + EnableTelemetryBuffer bool } // Client is the underlying processor that is used by the main API and Hub @@ -263,8 +269,9 @@ type Client struct { sdkVersion string // Transport is read-only. Replacing the transport of an existing client is // not supported, create a new client instead. - Transport Transport - batchLogger *BatchLogger + Transport Transport + batchLogger *BatchLogger + telemetryBuffer *telemetry.Buffer } // NewClient creates and returns an instance of Client configured using @@ -364,12 +371,15 @@ func NewClient(options ClientOptions) (*Client, error) { sdkVersion: SDKVersion, } - if options.EnableLogs { + client.setupTransport() + + if options.EnableTelemetryBuffer { + client.setupTelemetryBuffer() + } else if options.EnableLogs { client.batchLogger = NewBatchLogger(&client) client.batchLogger.Start() } - client.setupTransport() client.setupIntegrations() return &client, nil @@ -391,6 +401,41 @@ func (client *Client) setupTransport() { client.Transport = transport } +func (client *Client) setupTelemetryBuffer() { + if !client.options.EnableTelemetryBuffer { + return + } + + if client.dsn == nil { + debuglog.Println("Telemetry buffer disabled: no DSN configured") + return + } + + transport := httpInternal.NewAsyncTransport(httpInternal.TransportOptions{ + Dsn: client.options.Dsn, + HTTPClient: client.options.HTTPClient, + HTTPTransport: client.options.HTTPTransport, + HTTPProxy: client.options.HTTPProxy, + HTTPSProxy: client.options.HTTPSProxy, + CaCerts: client.options.CaCerts, + }) + client.Transport = &internalAsyncTransportAdapter{transport: transport} + + storage := map[ratelimit.Category]telemetry.Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 100, telemetry.OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTransaction: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 1000, telemetry.OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryLog: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 100, telemetry.OverflowPolicyDropOldest, 100, 5*time.Second), + ratelimit.CategoryMonitor: telemetry.NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 100, telemetry.OverflowPolicyDropOldest, 1, 0), + } + + sdkInfo := &protocol.SdkInfo{ + Name: client.sdkIdentifier, + Version: client.sdkVersion, + } + + client.telemetryBuffer = telemetry.NewBuffer(storage, transport, &client.dsn.Dsn, sdkInfo) +} + func (client *Client) setupIntegrations() { integrations := []Integration{ new(contextifyFramesIntegration), @@ -531,7 +576,7 @@ func (client *Client) RecoverWithContext( // the network synchronously, configure it to use the HTTPSyncTransport in the // call to Init. func (client *Client) Flush(timeout time.Duration) bool { - if client.batchLogger != nil { + if client.batchLogger != nil || client.telemetryBuffer != nil { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() return client.FlushWithContext(ctx) @@ -555,6 +600,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool { if client.batchLogger != nil { client.batchLogger.Flush(ctx.Done()) } + if client.telemetryBuffer != nil { + return client.telemetryBuffer.FlushWithContext(ctx) + } return client.Transport.FlushWithContext(ctx) } @@ -563,6 +611,9 @@ func (client *Client) FlushWithContext(ctx context.Context) bool { // Close should be called after Flush and before terminating the program // otherwise some events may be lost. func (client *Client) Close() { + if client.telemetryBuffer != nil { + client.telemetryBuffer.Close(5 * time.Second) + } client.Transport.Close() } @@ -683,7 +734,13 @@ func (client *Client) processEvent(event *Event, hint *EventHint, scope EventMod } } - client.Transport.SendEvent(event) + if client.telemetryBuffer != nil { + if !client.telemetryBuffer.Add(event) { + debuglog.Println("Event dropped: telemetry buffer full or unavailable") + } + } else { + client.Transport.SendEvent(event) + } return &event.EventID } diff --git a/client_test.go b/client_test.go index de9d43418..1440d5380 100644 --- a/client_test.go +++ b/client_test.go @@ -1,6 +1,7 @@ package sentry import ( + "bytes" "context" "encoding/json" "errors" @@ -12,6 +13,7 @@ import ( "testing" "time" + "github.com/getsentry/sentry-go/internal/debuglog" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" pkgErrors "github.com/pkg/errors" @@ -1027,3 +1029,46 @@ func TestClientSetsUpTransport(t *testing.T) { client, _ = NewClient(ClientOptions{}) require.IsType(t, &noopTransport{}, client.Transport) } + +func TestClient_SetupTelemetryBuffer_WithDSN(t *testing.T) { + client, err := NewClient(ClientOptions{ + Dsn: "https://public@localhost/1", + EnableTelemetryBuffer: true, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if client.telemetryBuffer == nil { + t.Fatal("expected telemetryBuffer to be initialized") + } + + if _, ok := client.Transport.(*internalAsyncTransportAdapter); !ok { + t.Fatalf("expected internalAsyncTransportAdapter, got %T", client.Transport) + } + + if !client.telemetryBuffer.Add(NewEvent()) { + t.Fatal("expected Add to succeed with default buffers") + } +} + +func TestClient_SetupTelemetryBuffer_NoDSN(t *testing.T) { + var buf bytes.Buffer + debuglog.SetOutput(&buf) + defer debuglog.SetOutput(&bytes.Buffer{}) + + client, err := NewClient(ClientOptions{ + EnableTelemetryBuffer: true, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if client.telemetryBuffer != nil { + t.Fatal("expected telemetryBuffer to be nil when DSN is missing") + } + + if _, ok := client.Transport.(*noopTransport); !ok { + t.Fatalf("expected noopTransport, got %T", client.Transport) + } +} diff --git a/hub_test.go b/hub_test.go index ee98051ea..6f92a77e0 100644 --- a/hub_test.go +++ b/hub_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/getsentry/sentry-go/internal/protocol" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" ) @@ -177,9 +178,9 @@ func TestConfigureScope(t *testing.T) { } func TestLastEventID(t *testing.T) { - uuid := EventID(uuid()) - hub := &Hub{lastEventID: uuid} - assertEqual(t, uuid, hub.LastEventID()) + eventID := EventID(protocol.GenerateEventID()) + hub := &Hub{lastEventID: eventID} + assertEqual(t, eventID, hub.LastEventID()) } func TestLastEventIDUpdatesAfterCaptures(t *testing.T) { diff --git a/interfaces.go b/interfaces.go index ea2f8d883..d7c197df1 100644 --- a/interfaces.go +++ b/interfaces.go @@ -493,6 +493,70 @@ func (e *Event) ToEnvelopeWithTime(dsn *protocol.Dsn, sentAt time.Time) (*protoc return envelope, nil } +// ToEnvelopeItem converts the Event to a Sentry envelope item. +func (e *Event) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { + eventBody, err := json.Marshal(e) + if err != nil { + // Try fallback: remove problematic fields and retry + e.Breadcrumbs = nil + e.Contexts = nil + e.Extra = map[string]interface{}{ + "info": fmt.Sprintf("Could not encode original event as JSON. "+ + "Succeeded by removing Breadcrumbs, Contexts and Extra. "+ + "Please verify the data you attach to the scope. "+ + "Error: %s", err), + } + + eventBody, err = json.Marshal(e) + if err != nil { + return nil, fmt.Errorf("event could not be marshaled even with fallback: %w", err) + } + + DebugLogger.Printf("Event marshaling succeeded with fallback after removing problematic fields") + } + + // TODO: all event types should be abstracted to implement EnvelopeItemConvertible and convert themselves. + var item *protocol.EnvelopeItem + switch e.Type { + case transactionType: + item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeTransaction, eventBody) + case checkInType: + item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeCheckIn, eventBody) + case logEvent.Type: + item = protocol.NewLogItem(len(e.Logs), eventBody) + default: + item = protocol.NewEnvelopeItem(protocol.EnvelopeItemTypeEvent, eventBody) + } + + return item, nil +} + +// GetCategory returns the rate limit category for this event. +func (e *Event) GetCategory() ratelimit.Category { + return e.toCategory() +} + +// GetEventID returns the event ID. +func (e *Event) GetEventID() string { + return string(e.EventID) +} + +// GetSdkInfo returns SDK information for the envelope header. +func (e *Event) GetSdkInfo() *protocol.SdkInfo { + return &e.Sdk +} + +// GetDynamicSamplingContext returns trace context for the envelope header. +func (e *Event) GetDynamicSamplingContext() map[string]string { + trace := make(map[string]string) + if dsc := e.sdkMetaData.dsc; dsc.HasEntries() { + for k, v := range dsc.Entries { + trace[k] = v + } + } + return trace +} + // TODO: Event.Contexts map[string]interface{} => map[string]EventContext, // to prevent accidentally storing T when we mean *T. // For example, the TraceContext must be stored as *TraceContext to pick up the @@ -667,6 +731,69 @@ type Log struct { Attributes map[string]Attribute `json:"attributes,omitempty"` } +// ToEnvelopeItem converts the Log to a Sentry envelope item for batching. +func (l *Log) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { + type logJSON struct { + Timestamp *float64 `json:"timestamp,omitempty"` + TraceID string `json:"trace_id,omitempty"` + Level string `json:"level"` + Severity int `json:"severity_number,omitempty"` + Body string `json:"body,omitempty"` + Attributes map[string]protocol.LogAttribute `json:"attributes,omitempty"` + } + + // Convert time.Time to seconds float if set + var ts *float64 + if !l.Timestamp.IsZero() { + sec := float64(l.Timestamp.UnixNano()) / 1e9 + ts = &sec + } + + attrs := make(map[string]protocol.LogAttribute, len(l.Attributes)) + for k, v := range l.Attributes { + attrs[k] = protocol.LogAttribute{Value: v.Value, Type: string(v.Type)} + } + + logData, err := json.Marshal(logJSON{ + Timestamp: ts, + TraceID: l.TraceID.String(), + Level: string(l.Level), + Severity: l.Severity, + Body: l.Body, + Attributes: attrs, + }) + if err != nil { + return nil, err + } + + return &protocol.EnvelopeItem{ + Header: &protocol.EnvelopeItemHeader{ + Type: protocol.EnvelopeItemTypeLog, + }, + Payload: logData, + }, nil +} + +// GetCategory returns the rate limit category for logs. +func (l *Log) GetCategory() ratelimit.Category { + return ratelimit.CategoryLog +} + +// GetEventID returns empty string (event ID set when batching). +func (l *Log) GetEventID() string { + return "" +} + +// GetSdkInfo returns nil (SDK info set when batching). +func (l *Log) GetSdkInfo() *protocol.SdkInfo { + return nil +} + +// GetDynamicSamplingContext returns nil (trace context set when batching). +func (l *Log) GetDynamicSamplingContext() map[string]string { + return nil +} + type AttrType string const ( diff --git a/interfaces_test.go b/interfaces_test.go index 9c2165803..795c1f8b8 100644 --- a/interfaces_test.go +++ b/interfaces_test.go @@ -915,12 +915,122 @@ func TestEvent_ToEnvelope_FallbackOnMarshalError(t *testing.T) { } info, exists := extra["info"].(string) - if !exists { - t.Error("Expected info field in extra after fallback") + if !exists || !strings.Contains(info, "Could not encode original event as JSON") { + t.Fatal("Expected fallback info message in extra field for ToEnvelopeItem") + } +} + +func TestEvent_ToEnvelopeItem_FallbackOnMarshalError(t *testing.T) { + unmarshalableFunc := func() string { return "test" } + + event := &Event{ + EventID: "12345678901234567890123456789012", + Message: "test message with fallback", + Level: LevelError, + Timestamp: time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + Extra: map[string]interface{}{ + "bad_data": unmarshalableFunc, + }, + } + + item, err := event.ToEnvelopeItem() + if err != nil { + t.Errorf("ToEnvelopeItem() should not error even with unmarshalable data, got: %v", err) return } + if item == nil { + t.Fatal("ToEnvelopeItem() returned nil item") + } + + var payload map[string]interface{} + if err := json.Unmarshal(item.Payload, &payload); err != nil { + t.Fatalf("Failed to unmarshal item payload: %v", err) + } + + extra, exists := payload["extra"].(map[string]interface{}) + if !exists { + t.Fatal("Expected extra field after fallback in ToEnvelopeItem") + } + + info, exists := extra["info"].(string) + if !exists || !strings.Contains(info, "Could not encode original event as JSON") { + t.Fatal("Expected fallback info message in extra field for ToEnvelopeItem") + } +} + +func TestLog_ToEnvelopeItem_And_Getters(t *testing.T) { + ts := time.Unix(1700000000, 500_000_000).UTC() + trace := TraceIDFromHex("d6c4f03650bd47699ec65c84352b6208") + l := &Log{ + Timestamp: ts, + TraceID: trace, + Level: LogLevelInfo, + Severity: LogSeverityInfo, + Body: "hello world", + Attributes: map[string]Attribute{ + "k1": {Value: "v1", Type: AttributeString}, + "k2": {Value: int64(42), Type: AttributeInt}, + }, + } + + item, err := l.ToEnvelopeItem() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if item == nil || item.Header == nil { + t.Fatal("expected non-nil envelope item and header") + } + if item.Header.Type != protocol.EnvelopeItemTypeLog { + t.Fatalf("expected log item type, got %q", item.Header.Type) + } + + var payload struct { + Timestamp *float64 `json:"timestamp,omitempty"` + TraceID string `json:"trace_id,omitempty"` + Level string `json:"level"` + Severity int `json:"severity_number,omitempty"` + Body string `json:"body,omitempty"` + Attributes map[string]protocol.LogAttribute `json:"attributes,omitempty"` + } + if err := json.Unmarshal(item.Payload, &payload); err != nil { + t.Fatalf("failed to unmarshal payload: %v", err) + } - if !strings.Contains(info, "Could not encode original event as JSON") { - t.Error("Expected fallback info message in extra field") + if payload.Timestamp == nil { + t.Fatal("expected timestamp to be set") + } + if *payload.Timestamp < 1.7e9 || *payload.Timestamp > 1.700000001e9 { + t.Fatalf("unexpected timestamp: %v", *payload.Timestamp) + } + if payload.TraceID != trace.String() { + t.Fatalf("unexpected trace id: %q", payload.TraceID) + } + if payload.Level != string(LogLevelInfo) { + t.Fatalf("unexpected level: %q", payload.Level) + } + if payload.Severity != LogSeverityInfo { + t.Fatalf("unexpected severity: %d", payload.Severity) + } + if payload.Body != "hello world" { + t.Fatalf("unexpected body: %q", payload.Body) + } + if payload.Attributes["k1"].Type != string(AttributeString) || payload.Attributes["k1"].Value != "v1" { + t.Fatalf("unexpected attribute k1: %+v", payload.Attributes["k1"]) + } + if payload.Attributes["k2"].Type != string(AttributeInt) || payload.Attributes["k2"].Value != float64(42) { + t.Fatalf("unexpected attribute k2: %+v", payload.Attributes["k2"]) + } + + if l.GetCategory() != ratelimit.CategoryLog { + t.Fatalf("unexpected category: %v", l.GetCategory()) + } + if l.GetEventID() != "" { + t.Fatalf("expected empty event id, got %q", l.GetEventID()) + } + if l.GetSdkInfo() != nil { + t.Fatal("expected nil sdk info for logs") + } + if dsc := l.GetDynamicSamplingContext(); dsc != nil { + t.Fatalf("expected nil DSC for logs, got: %+v", dsc) } } diff --git a/internal/http/transport.go b/internal/http/transport.go index 52cb32b41..60a1d636f 100644 --- a/internal/http/transport.go +++ b/internal/http/transport.go @@ -33,6 +33,7 @@ const ( var ( ErrTransportQueueFull = errors.New("transport queue full") ErrTransportClosed = errors.New("transport is closed") + ErrEmptyEnvelope = errors.New("empty envelope provided") ) type TransportOptions struct { @@ -196,31 +197,13 @@ func (t *SyncTransport) SendEnvelope(envelope *protocol.Envelope) error { func (t *SyncTransport) Close() {} -func (t *SyncTransport) SendEvent(event protocol.EnvelopeConvertible) { - envelope, err := event.ToEnvelope(t.dsn) - if err != nil { - debuglog.Printf("Failed to convert to envelope: %v", err) - return - } - - if envelope == nil { - debuglog.Printf("Error: event with empty envelope") - return - } - - if err := t.SendEnvelope(envelope); err != nil { - debuglog.Printf("Error sending the envelope: %v", err) - } -} - func (t *SyncTransport) IsRateLimited(category ratelimit.Category) bool { return t.disabled(category) } func (t *SyncTransport) SendEnvelopeWithContext(ctx context.Context, envelope *protocol.Envelope) error { - if envelope == nil { - debuglog.Printf("Error: provided empty envelope") - return nil + if envelope == nil || len(envelope.Items) == 0 { + return ErrEmptyEnvelope } category := categoryFromEnvelope(envelope) @@ -233,6 +216,14 @@ func (t *SyncTransport) SendEnvelopeWithContext(ctx context.Context, envelope *p debuglog.Printf("There was an issue creating the request: %v", err) return err } + debuglog.Printf( + "Sending %s [%s] to %s project: %s", + envelope.Items[0].Header.Type, + envelope.Header.EventID, + t.dsn.GetHost(), + t.dsn.GetProjectID(), + ) + response, err := t.client.Do(request) if err != nil { debuglog.Printf("There was an issue with sending an event: %v", err) @@ -361,6 +352,10 @@ func (t *AsyncTransport) SendEnvelope(envelope *protocol.Envelope) error { default: } + if envelope == nil || len(envelope.Items) == 0 { + return ErrEmptyEnvelope + } + category := categoryFromEnvelope(envelope) if t.isRateLimited(category) { return nil @@ -368,6 +363,13 @@ func (t *AsyncTransport) SendEnvelope(envelope *protocol.Envelope) error { select { case t.queue <- envelope: + debuglog.Printf( + "Sending %s [%s] to %s project: %s", + envelope.Items[0].Header.Type, + envelope.Header.EventID, + t.dsn.GetHost(), + t.dsn.GetProjectID(), + ) return nil default: atomic.AddInt64(&t.droppedCount, 1) @@ -375,23 +377,6 @@ func (t *AsyncTransport) SendEnvelope(envelope *protocol.Envelope) error { } } -func (t *AsyncTransport) SendEvent(event protocol.EnvelopeConvertible) { - envelope, err := event.ToEnvelope(t.dsn) - if err != nil { - debuglog.Printf("Failed to convert to envelope: %v", err) - return - } - - if envelope == nil { - debuglog.Printf("Error: event with empty envelope") - return - } - - if err := t.SendEnvelope(envelope); err != nil { - debuglog.Printf("Error sending the envelope: %v", err) - } -} - func (t *AsyncTransport) Flush(timeout time.Duration) bool { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -404,11 +389,14 @@ func (t *AsyncTransport) FlushWithContext(ctx context.Context) bool { case t.flushRequest <- flushResponse: select { case <-flushResponse: + debuglog.Println("Buffer flushed successfully.") return true case <-ctx.Done(): + debuglog.Println("Failed to flush, buffer timed out.") return false } case <-ctx.Done(): + debuglog.Println("Failed to flush, buffer timed out.") return false } } @@ -550,10 +538,6 @@ func (t *NoopTransport) SendEnvelope(_ *protocol.Envelope) error { return nil } -func (t *NoopTransport) SendEvent(_ protocol.EnvelopeConvertible) { - debuglog.Println("Event dropped due to NoopTransport usage.") -} - func (t *NoopTransport) IsRateLimited(_ ratelimit.Category) bool { return false } diff --git a/internal/http/transport_test.go b/internal/http/transport_test.go index 08c8ef55e..5891572af 100644 --- a/internal/http/transport_test.go +++ b/internal/http/transport_test.go @@ -21,15 +21,6 @@ import ( "go.uber.org/goleak" ) -type mockEnvelopeConvertible struct { - envelope *protocol.Envelope - err error -} - -func (m *mockEnvelopeConvertible) ToEnvelope(_ *protocol.Dsn) (*protocol.Envelope, error) { - return m.envelope, m.err -} - func testEnvelope(itemType protocol.EnvelopeItemType) *protocol.Envelope { return &protocol.Envelope{ Header: &protocol.EnvelopeHeader{ @@ -50,6 +41,7 @@ func testEnvelope(itemType protocol.EnvelopeItemType) *protocol.Envelope { } } +// nolint:gocyclo func TestAsyncTransport_SendEnvelope(t *testing.T) { t.Run("invalid DSN", func(t *testing.T) { transport := NewAsyncTransport(TransportOptions{}) @@ -244,61 +236,42 @@ func TestAsyncTransport_SendEnvelope(t *testing.T) { t.Errorf("expected ErrTransportQueueFull, got %v", err) } }) -} - -func TestAsyncTransport_SendEvent(t *testing.T) { - tests := []struct { - name string - event *mockEnvelopeConvertible - }{ - { - name: "conversion error", - event: &mockEnvelopeConvertible{ - envelope: nil, - err: errors.New("conversion error"), - }, - }, - { - name: "nil envelope", - event: &mockEnvelopeConvertible{ - envelope: nil, - err: nil, - }, - }, - { - name: "success", - event: &mockEnvelopeConvertible{ - envelope: testEnvelope(protocol.EnvelopeItemTypeEvent), - err: nil, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer server.Close() + t.Run("FlushMultipleTimes", func(t *testing.T) { + var count int64 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt64(&count, 1) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() - tr := NewAsyncTransport(TransportOptions{ - Dsn: "http://key@" + server.URL[7:] + "/123", - }) - transport, ok := tr.(*AsyncTransport) - if !ok { - t.Fatalf("expected *AsyncTransport, got %T", tr) - } - defer transport.Close() + tr := NewAsyncTransport(TransportOptions{ + Dsn: "http://key@" + server.URL[7:] + "/123", + }) + transport, ok := tr.(*AsyncTransport) + if !ok { + t.Fatalf("expected *AsyncTransport, got %T", tr) + } + defer transport.Close() - transport.SendEvent(tt.event) + if err := transport.SendEnvelope(testEnvelope(protocol.EnvelopeItemTypeEvent)); err != nil { + t.Fatalf("failed to send envelope: %v", err) + } + if !transport.Flush(testutils.FlushTimeout()) { + t.Fatal("Flush timed out") + } - if tt.event.err == nil && tt.event.envelope != nil { - if !transport.Flush(testutils.FlushTimeout()) { - t.Fatal("Flush timed out") - } + initial := atomic.LoadInt64(&count) + for i := 0; i < 10; i++ { + if !transport.Flush(testutils.FlushTimeout()) { + t.Fatalf("Flush %d timed out", i) } - }) - } + } + + if got := atomic.LoadInt64(&count); got != initial { + t.Errorf("expected %d requests after multiple flushes, got %d", initial, got) + } + }) } func TestAsyncTransport_FlushWithContext(t *testing.T) { @@ -455,50 +428,6 @@ func TestSyncTransport_SendEnvelope(t *testing.T) { }) } -func TestSyncTransport_SendEvent(t *testing.T) { - tests := []struct { - name string - event *mockEnvelopeConvertible - }{ - { - name: "conversion error", - event: &mockEnvelopeConvertible{ - envelope: nil, - err: errors.New("conversion error"), - }, - }, - { - name: "nil envelope", - event: &mockEnvelopeConvertible{ - envelope: nil, - err: nil, - }, - }, - { - name: "success", - event: &mockEnvelopeConvertible{ - envelope: testEnvelope(protocol.EnvelopeItemTypeEvent), - err: nil, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(_ *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer server.Close() - - transport := NewSyncTransport(TransportOptions{ - Dsn: "http://key@" + server.URL[7:] + "/123", - }) - - transport.SendEvent(tt.event) - }) - } -} - func TestSyncTransport_Flush(t *testing.T) { transport := NewSyncTransport(TransportOptions{}) diff --git a/internal/protocol/interfaces.go b/internal/protocol/interfaces.go index 6f6f29a7a..d2d0e9704 100644 --- a/internal/protocol/interfaces.go +++ b/internal/protocol/interfaces.go @@ -7,12 +7,23 @@ import ( "github.com/getsentry/sentry-go/internal/ratelimit" ) -// EnvelopeConvertible represents any type that can be converted to a Sentry envelope. -// This interface allows the telemetry buffers to be generic while still working with -// concrete types like Event. -type EnvelopeConvertible interface { - // ToEnvelope converts the item to a Sentry envelope. - ToEnvelope(dsn *Dsn) (*Envelope, error) +// EnvelopeItemConvertible represents any type that can be converted to a Sentry envelope item. +// This interface allows the telemetry buffers to work with items that can be batched together. +type EnvelopeItemConvertible interface { + // ToEnvelopeItem converts the item to a Sentry envelope item. + ToEnvelopeItem() (*EnvelopeItem, error) + + // GetCategory returns the rate limit category for this item. + GetCategory() ratelimit.Category + + // GetEventID returns the event ID for this item. + GetEventID() string + + // GetSdkInfo returns SDK information for the envelope header. + GetSdkInfo() *SdkInfo + + // GetDynamicSamplingContext returns trace context for the envelope header. + GetDynamicSamplingContext() map[string]string } // TelemetryTransport represents the envelope-first transport interface. @@ -23,9 +34,6 @@ type TelemetryTransport interface { // backpressure error if the queue is full. SendEnvelope(envelope *Envelope) error - // SendEvent sends an event to Sentry. - SendEvent(event EnvelopeConvertible) - // IsRateLimited checks if a specific category is currently rate limited IsRateLimited(category ratelimit.Category) bool diff --git a/internal/protocol/log_batch.go b/internal/protocol/log_batch.go new file mode 100644 index 000000000..2860ed826 --- /dev/null +++ b/internal/protocol/log_batch.go @@ -0,0 +1,48 @@ +package protocol + +import ( + "encoding/json" + + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// LogAttribute is the JSON representation for a single log attribute value. +type LogAttribute struct { + Value any `json:"value"` + Type string `json:"type"` +} + +// Logs is a container for multiple log items which knows how to convert +// itself into a single batched log envelope item. +type Logs []EnvelopeItemConvertible + +func (ls Logs) ToEnvelopeItem() (*EnvelopeItem, error) { + // Convert each log to its JSON representation + items := make([]json.RawMessage, 0, len(ls)) + for _, log := range ls { + envItem, err := log.ToEnvelopeItem() + if err != nil { + continue + } + items = append(items, envItem.Payload) + } + + if len(items) == 0 { + return nil, nil + } + + wrapper := struct { + Items []json.RawMessage `json:"items"` + }{Items: items} + + payload, err := json.Marshal(wrapper) + if err != nil { + return nil, err + } + return NewLogItem(len(ls), payload), nil +} + +func (Logs) GetCategory() ratelimit.Category { return ratelimit.CategoryLog } +func (Logs) GetEventID() string { return "" } +func (Logs) GetSdkInfo() *SdkInfo { return nil } +func (Logs) GetDynamicSamplingContext() map[string]string { return nil } diff --git a/internal/protocol/log_batch_test.go b/internal/protocol/log_batch_test.go new file mode 100644 index 000000000..5ab0ac679 --- /dev/null +++ b/internal/protocol/log_batch_test.go @@ -0,0 +1,52 @@ +package protocol + +import ( + "encoding/json" + "testing" + + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +type dummyLog struct{ body string } + +func (d dummyLog) ToEnvelopeItem() (*EnvelopeItem, error) { + payload := []byte(`{"body":"` + d.body + `"}`) + return &EnvelopeItem{Header: &EnvelopeItemHeader{Type: EnvelopeItemTypeLog}, Payload: payload}, nil +} +func (dummyLog) GetCategory() ratelimit.Category { return ratelimit.CategoryLog } +func (dummyLog) GetEventID() string { return "" } +func (dummyLog) GetSdkInfo() *SdkInfo { return nil } +func (dummyLog) GetDynamicSamplingContext() map[string]string { return nil } + +func TestLogs_ToEnvelopeItem_And_Getters(t *testing.T) { + logs := Logs{dummyLog{body: "a"}, dummyLog{body: "b"}} + item, err := logs.ToEnvelopeItem() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if item == nil || item.Header == nil || item.Header.Type != EnvelopeItemTypeLog { + t.Fatalf("unexpected envelope item: %#v", item) + } + var payload struct { + Items []json.RawMessage `json:"items"` + } + if err := json.Unmarshal(item.Payload, &payload); err != nil { + t.Fatalf("unmarshal failed: %v", err) + } + if len(payload.Items) != 2 { + t.Fatalf("expected 2 items, got %d", len(payload.Items)) + } + + if Logs(nil).GetCategory() != ratelimit.CategoryLog { + t.Fatal("category mismatch") + } + if Logs(nil).GetEventID() != "" { + t.Fatal("event id should be empty") + } + if Logs(nil).GetSdkInfo() != nil { + t.Fatal("sdk info should be nil") + } + if Logs(nil).GetDynamicSamplingContext() != nil { + t.Fatal("dsc should be nil") + } +} diff --git a/internal/protocol/uuid.go b/internal/protocol/uuid.go new file mode 100644 index 000000000..5aff3b19f --- /dev/null +++ b/internal/protocol/uuid.go @@ -0,0 +1,18 @@ +package protocol + +import ( + "crypto/rand" + "encoding/hex" +) + +// GenerateEventID generates a random UUID v4 for use as a Sentry event ID. +func GenerateEventID() string { + id := make([]byte, 16) + // Prefer rand.Read over rand.Reader, see https://go-review.googlesource.com/c/go/+/272326/. + _, _ = rand.Read(id) + id[6] &= 0x0F // clear version + id[6] |= 0x40 // set version to 4 (random uuid) + id[8] &= 0x3F // clear variant + id[8] |= 0x80 // set to IETF variant + return hex.EncodeToString(id) +} diff --git a/internal/protocol/uuid_test.go b/internal/protocol/uuid_test.go new file mode 100644 index 000000000..675140a2d --- /dev/null +++ b/internal/protocol/uuid_test.go @@ -0,0 +1,34 @@ +package protocol + +import ( + "encoding/hex" + "testing" +) + +func TestGenerateEventID_FormatVersionVariant(t *testing.T) { + const n = 100 + seen := make(map[string]struct{}, n) + for i := 0; i < n; i++ { + id := GenerateEventID() + if len(id) != 32 { + t.Fatalf("length = %d, want 32", len(id)) + } + b, err := hex.DecodeString(id) + if err != nil { + t.Fatalf("id not hex: %v", err) + } + if len(b) != 16 { + t.Fatalf("decoded length = %d, want 16", len(b)) + } + if v := b[6] & 0xF0; v != 0x40 { + t.Fatalf("version nibble = 0x%x, want 0x40", v) + } + if v := b[8] & 0xC0; v != 0x80 { + t.Fatalf("variant bits = 0x%x, want 0x80", v) + } + if _, exists := seen[id]; exists { + t.Fatalf("duplicate id generated: %s", id) + } + seen[id] = struct{}{} + } +} diff --git a/internal/telemetry/bucketed_buffer.go b/internal/telemetry/bucketed_buffer.go new file mode 100644 index 000000000..75e621e55 --- /dev/null +++ b/internal/telemetry/bucketed_buffer.go @@ -0,0 +1,398 @@ +package telemetry + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +const ( + defaultBucketedCapacity = 100 + perBucketItemLimit = 100 +) + +type Bucket[T any] struct { + traceID string + items []T + createdAt time.Time + lastUpdatedAt time.Time +} + +// BucketedBuffer groups items by trace id, flushing per bucket. +type BucketedBuffer[T any] struct { + mu sync.RWMutex + + buckets []*Bucket[T] + traceIndex map[string]int + + head int + tail int + + itemCapacity int + bucketCapacity int + + totalItems int + bucketCount int + + category ratelimit.Category + priority ratelimit.Priority + overflowPolicy OverflowPolicy + batchSize int + timeout time.Duration + lastFlushTime time.Time + + offered int64 + dropped int64 + onDropped func(item T, reason string) +} + +func NewBucketedBuffer[T any]( + category ratelimit.Category, + capacity int, + overflowPolicy OverflowPolicy, + batchSize int, + timeout time.Duration, +) *BucketedBuffer[T] { + if capacity <= 0 { + capacity = defaultBucketedCapacity + } + if batchSize <= 0 { + batchSize = 1 + } + if timeout < 0 { + timeout = 0 + } + + bucketCapacity := capacity / 10 + if bucketCapacity < 10 { + bucketCapacity = 10 + } + + return &BucketedBuffer[T]{ + buckets: make([]*Bucket[T], bucketCapacity), + traceIndex: make(map[string]int), + itemCapacity: capacity, + bucketCapacity: bucketCapacity, + category: category, + priority: category.GetPriority(), + overflowPolicy: overflowPolicy, + batchSize: batchSize, + timeout: timeout, + lastFlushTime: time.Now(), + } +} + +func (b *BucketedBuffer[T]) Offer(item T) bool { + atomic.AddInt64(&b.offered, 1) + + traceID := "" + if ta, ok := any(item).(TraceAware); ok { + if tid, hasTrace := ta.GetTraceID(); hasTrace { + traceID = tid + } + } + + b.mu.Lock() + defer b.mu.Unlock() + return b.offerToBucket(item, traceID) +} + +func (b *BucketedBuffer[T]) offerToBucket(item T, traceID string) bool { + if traceID != "" { + if idx, exists := b.traceIndex[traceID]; exists { + bucket := b.buckets[idx] + if len(bucket.items) >= perBucketItemLimit { + delete(b.traceIndex, traceID) + } else { + bucket.items = append(bucket.items, item) + bucket.lastUpdatedAt = time.Now() + b.totalItems++ + return true + } + } + } + + if b.totalItems >= b.itemCapacity { + return b.handleOverflow(item, traceID) + } + if b.bucketCount >= b.bucketCapacity { + return b.handleOverflow(item, traceID) + } + + bucket := &Bucket[T]{ + traceID: traceID, + items: []T{item}, + createdAt: time.Now(), + lastUpdatedAt: time.Now(), + } + b.buckets[b.tail] = bucket + if traceID != "" { + b.traceIndex[traceID] = b.tail + } + b.tail = (b.tail + 1) % b.bucketCapacity + b.bucketCount++ + b.totalItems++ + return true +} + +func (b *BucketedBuffer[T]) handleOverflow(item T, traceID string) bool { + switch b.overflowPolicy { + case OverflowPolicyDropOldest: + oldestBucket := b.buckets[b.head] + if oldestBucket == nil { + atomic.AddInt64(&b.dropped, 1) + if b.onDropped != nil { + b.onDropped(item, "buffer_full_invalid_state") + } + return false + } + if oldestBucket.traceID != "" { + delete(b.traceIndex, oldestBucket.traceID) + } + droppedCount := len(oldestBucket.items) + atomic.AddInt64(&b.dropped, int64(droppedCount)) + if b.onDropped != nil { + for _, di := range oldestBucket.items { + b.onDropped(di, "buffer_full_drop_oldest_bucket") + } + } + b.totalItems -= droppedCount + b.bucketCount-- + b.head = (b.head + 1) % b.bucketCapacity + // add new bucket + bucket := &Bucket[T]{traceID: traceID, items: []T{item}, createdAt: time.Now(), lastUpdatedAt: time.Now()} + b.buckets[b.tail] = bucket + if traceID != "" { + b.traceIndex[traceID] = b.tail + } + b.tail = (b.tail + 1) % b.bucketCapacity + b.bucketCount++ + b.totalItems++ + return true + case OverflowPolicyDropNewest: + atomic.AddInt64(&b.dropped, 1) + if b.onDropped != nil { + b.onDropped(item, "buffer_full_drop_newest") + } + return false + default: + atomic.AddInt64(&b.dropped, 1) + if b.onDropped != nil { + b.onDropped(item, "unknown_overflow_policy") + } + return false + } +} + +func (b *BucketedBuffer[T]) Poll() (T, bool) { + b.mu.Lock() + defer b.mu.Unlock() + var zero T + if b.bucketCount == 0 { + return zero, false + } + bucket := b.buckets[b.head] + if bucket == nil || len(bucket.items) == 0 { + return zero, false + } + item := bucket.items[0] + bucket.items = bucket.items[1:] + b.totalItems-- + if len(bucket.items) == 0 { + if bucket.traceID != "" { + delete(b.traceIndex, bucket.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.bucketCount-- + } + return item, true +} + +func (b *BucketedBuffer[T]) PollBatch(maxItems int) []T { + if maxItems <= 0 { + return nil + } + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + res := make([]T, 0, maxItems) + for len(res) < maxItems && b.bucketCount > 0 { + bucket := b.buckets[b.head] + if bucket == nil { + break + } + n := maxItems - len(res) + if n > len(bucket.items) { + n = len(bucket.items) + } + res = append(res, bucket.items[:n]...) + bucket.items = bucket.items[n:] + b.totalItems -= n + if len(bucket.items) == 0 { + if bucket.traceID != "" { + delete(b.traceIndex, bucket.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.bucketCount-- + } + } + return res +} + +func (b *BucketedBuffer[T]) PollIfReady() []T { + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout) + if !ready { + return nil + } + oldest := b.buckets[b.head] + if oldest == nil { + return nil + } + items := oldest.items + if oldest.traceID != "" { + delete(b.traceIndex, oldest.traceID) + } + b.buckets[b.head] = nil + b.head = (b.head + 1) % b.bucketCapacity + b.totalItems -= len(items) + b.bucketCount-- + b.lastFlushTime = time.Now() + return items +} + +func (b *BucketedBuffer[T]) Drain() []T { + b.mu.Lock() + defer b.mu.Unlock() + if b.bucketCount == 0 { + return nil + } + res := make([]T, 0, b.totalItems) + for i := 0; i < b.bucketCount; i++ { + idx := (b.head + i) % b.bucketCapacity + bucket := b.buckets[idx] + if bucket != nil { + res = append(res, bucket.items...) + b.buckets[idx] = nil + } + } + b.traceIndex = make(map[string]int) + b.head = 0 + b.tail = 0 + b.totalItems = 0 + b.bucketCount = 0 + return res +} + +func (b *BucketedBuffer[T]) Peek() (T, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + var zero T + if b.bucketCount == 0 { + return zero, false + } + bucket := b.buckets[b.head] + if bucket == nil || len(bucket.items) == 0 { + return zero, false + } + return bucket.items[0], true +} + +func (b *BucketedBuffer[T]) Size() int { b.mu.RLock(); defer b.mu.RUnlock(); return b.totalItems } +func (b *BucketedBuffer[T]) Capacity() int { b.mu.RLock(); defer b.mu.RUnlock(); return b.itemCapacity } +func (b *BucketedBuffer[T]) Category() ratelimit.Category { + b.mu.RLock() + defer b.mu.RUnlock() + return b.category +} +func (b *BucketedBuffer[T]) Priority() ratelimit.Priority { + b.mu.RLock() + defer b.mu.RUnlock() + return b.priority +} +func (b *BucketedBuffer[T]) IsEmpty() bool { + b.mu.RLock() + defer b.mu.RUnlock() + return b.bucketCount == 0 +} +func (b *BucketedBuffer[T]) IsFull() bool { + b.mu.RLock() + defer b.mu.RUnlock() + return b.totalItems >= b.itemCapacity +} +func (b *BucketedBuffer[T]) Utilization() float64 { + b.mu.RLock() + defer b.mu.RUnlock() + if b.itemCapacity == 0 { + return 0 + } + return float64(b.totalItems) / float64(b.itemCapacity) +} +func (b *BucketedBuffer[T]) OfferedCount() int64 { return atomic.LoadInt64(&b.offered) } +func (b *BucketedBuffer[T]) DroppedCount() int64 { return atomic.LoadInt64(&b.dropped) } +func (b *BucketedBuffer[T]) AcceptedCount() int64 { return b.OfferedCount() - b.DroppedCount() } +func (b *BucketedBuffer[T]) DropRate() float64 { + off := b.OfferedCount() + if off == 0 { + return 0 + } + return float64(b.DroppedCount()) / float64(off) +} + +func (b *BucketedBuffer[T]) GetMetrics() BufferMetrics { + b.mu.RLock() + size := b.totalItems + util := 0.0 + if b.itemCapacity > 0 { + util = float64(b.totalItems) / float64(b.itemCapacity) + } + b.mu.RUnlock() + return BufferMetrics{Category: b.category, Priority: b.priority, Capacity: b.itemCapacity, Size: size, Utilization: util, OfferedCount: b.OfferedCount(), DroppedCount: b.DroppedCount(), AcceptedCount: b.AcceptedCount(), DropRate: b.DropRate(), LastUpdated: time.Now()} +} + +func (b *BucketedBuffer[T]) SetDroppedCallback(callback func(item T, reason string)) { + b.mu.Lock() + defer b.mu.Unlock() + b.onDropped = callback +} +func (b *BucketedBuffer[T]) Clear() { + b.mu.Lock() + defer b.mu.Unlock() + for i := 0; i < b.bucketCapacity; i++ { + b.buckets[i] = nil + } + b.traceIndex = make(map[string]int) + b.head = 0 + b.tail = 0 + b.totalItems = 0 + b.bucketCount = 0 +} +func (b *BucketedBuffer[T]) IsReadyToFlush() bool { + b.mu.RLock() + defer b.mu.RUnlock() + if b.bucketCount == 0 { + return false + } + if b.totalItems >= b.batchSize { + return true + } + if b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout { + return true + } + return false +} +func (b *BucketedBuffer[T]) MarkFlushed() { + b.mu.Lock() + defer b.mu.Unlock() + b.lastFlushTime = time.Now() +} diff --git a/internal/telemetry/bucketed_buffer_test.go b/internal/telemetry/bucketed_buffer_test.go new file mode 100644 index 000000000..4e3cc004f --- /dev/null +++ b/internal/telemetry/bucketed_buffer_test.go @@ -0,0 +1,262 @@ +package telemetry + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +type tbItem struct { + id int + trace string +} + +func (i tbItem) GetTraceID() (string, bool) { + if i.trace == "" { + return "", false + } + return i.trace, true +} + +func TestBucketedBufferPollOperation(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 3, 0) + if !b.Offer(tbItem{id: 1}) || !b.Offer(tbItem{id: 2}) { + t.Fatal("offer failed") + } + if b.Size() != 2 { + t.Fatalf("size want 2 got %d", b.Size()) + } + if it, ok := b.Poll(); !ok || it.id != 1 { + t.Fatalf("poll got %#v ok=%v", it, ok) + } + if it, ok := b.Poll(); !ok || it.id != 2 { + t.Fatalf("poll got %#v ok=%v", it, ok) + } + if !b.IsEmpty() { + t.Fatal("expected empty after polls") + } +} + +func TestBucketedBufferOverflowDropOldest(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + dropped := 0 + b.SetDroppedCallback(func(_ tbItem, _ string) { dropped++ }) + b.Offer(tbItem{id: 1, trace: "a"}) + b.Offer(tbItem{id: 2, trace: "b"}) + b.Offer(tbItem{id: 3, trace: "c"}) + if !b.Offer(tbItem{id: 4, trace: "d"}) { + t.Fatal("offer should succeed and drop oldest") + } + if dropped == 0 { + t.Fatal("expected at least one dropped callback") + } + if b.Size() != 3 { + t.Fatalf("size should remain at capacity, got %d", b.Size()) + } +} + +func TestBucketedBufferPollIfReady_BatchSize(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 3, 0) + for i := 1; i <= 3; i++ { + b.Offer(tbItem{id: i, trace: "t"}) + } + items := b.PollIfReady() + if len(items) != 3 { + t.Fatalf("expected 3 items, got %d", len(items)) + } + if b.Size() != 0 { + t.Fatalf("expected empty after PollIfReady, size %d", b.Size()) + } +} + +func TestBucketedBufferPollIfReady_Timeout(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 1*time.Millisecond) + b.Offer(tbItem{id: 1, trace: "t"}) + time.Sleep(3 * time.Millisecond) + items := b.PollIfReady() + if len(items) != 1 { + t.Fatalf("expected 1 item due to timeout, got %d", len(items)) + } +} + +func TestNewBucketedBuffer(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 0, -1) + if b.Capacity() != 100 { + t.Fatalf("default capacity want 100 got %d", b.Capacity()) + } + if b.Category() != ratelimit.CategoryLog { + t.Fatalf("category mismatch: %v", b.Category()) + } + if b.Priority() != ratelimit.CategoryLog.GetPriority() { + t.Fatalf("priority mismatch: %v", b.Priority()) + } +} + +func TestBucketedBufferBasicOperations(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + if !b.IsEmpty() || b.IsFull() || b.Size() != 0 { + t.Fatalf("unexpected initial state: empty=%v full=%v size=%d", b.IsEmpty(), b.IsFull(), b.Size()) + } + b.Offer(tbItem{id: 1, trace: "t1"}) + if it, ok := b.Peek(); !ok || it.id != 1 { + t.Fatalf("peek got %#v ok=%v", it, ok) + } + // Same-trace aggregation + for i := 2; i <= 3; i++ { + b.Offer(tbItem{id: i, trace: "t1"}) + } + batch := b.PollBatch(3) + if len(batch) != 3 { + t.Fatalf("batch len want 3 got %d", len(batch)) + } +} + +func TestBucketedBufferPollBatchAcrossBuckets(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 10, 0) + // Two buckets with different traces + b.Offer(tbItem{id: 1, trace: "a"}) + b.Offer(tbItem{id: 2, trace: "a"}) + b.Offer(tbItem{id: 3, trace: "b"}) + b.Offer(tbItem{id: 4, trace: "b"}) + + batch := b.PollBatch(3) + if len(batch) != 3 { + t.Fatalf("want 3 got %d", len(batch)) + } +} + +func TestBucketedBufferDrain(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + for i := 1; i <= 5; i++ { + b.Offer(tbItem{id: i, trace: "t"}) + } + items := b.Drain() + if len(items) != 5 { + t.Fatalf("want 5 got %d", len(items)) + } + if !b.IsEmpty() || b.Size() != 0 { + t.Fatalf("buffer not reset after drain, size=%d", b.Size()) + } +} + +func TestBucketedBufferMetrics(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 10, OverflowPolicyDropNewest, 1, 0) + if b.OfferedCount() != 0 || b.DroppedCount() != 0 { + t.Fatalf("initial metrics not zero") + } + for i := 0; i < 12; i++ { // exceed capacity to force drops + b.Offer(tbItem{id: i}) + } + if b.OfferedCount() != 12 { + t.Fatalf("offered want 12 got %d", b.OfferedCount()) + } + if b.DroppedCount() == 0 { + t.Fatalf("expected some drops with DropNewest policy") + } + if b.Utilization() <= 0 || b.Utilization() > 1 { + t.Fatalf("unexpected utilization: %f", b.Utilization()) + } +} + +func TestBucketedBufferOverflowDropNewest(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) + b.Offer(tbItem{id: 1}) + b.Offer(tbItem{id: 2}) + if ok := b.Offer(tbItem{id: 3}); ok { + t.Fatal("expected offer to fail with drop newest when full") + } +} + +func TestBucketedBufferDroppedCallback(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + calls := 0 + b.SetDroppedCallback(func(_ tbItem, reason string) { + calls++ + if reason != "buffer_full_drop_oldest_bucket" && reason != "buffer_full_invalid_state" && reason != "buffer_full_drop_newest" { + t.Fatalf("unexpected drop reason: %s", reason) + } + }) + // oldest bucket will have 2 items + b.Offer(tbItem{id: 1, trace: "x"}) + b.Offer(tbItem{id: 2, trace: "x"}) + b.Offer(tbItem{id: 3, trace: "y"}) + // overflow should drop bucket with 2 items triggering 2 callbacks + b.Offer(tbItem{id: 4, trace: "z"}) + if calls < 1 { + t.Fatalf("expected at least one drop callback, got %d", calls) + } +} + +func TestBucketedBufferClear(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + b.Offer(tbItem{id: 1}) + b.Offer(tbItem{id: 2}) + b.Clear() + if !b.IsEmpty() || b.Size() != 0 { + t.Fatalf("expected empty after clear") + } +} + +func TestBucketedBufferIsReadyToFlush(t *testing.T) { + tests := []struct { + name string + category ratelimit.Category + items int + timeout time.Duration + expect bool + }{ + {"logs batch reached", ratelimit.CategoryLog, 3, 0, true}, + {"logs batch not reached", ratelimit.CategoryLog, 2, 0, false}, + {"timeout reached", ratelimit.CategoryLog, 1, 2 * time.Millisecond, true}, + {"error batch 1", ratelimit.CategoryError, 1, 0, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + batch := 3 + if tt.category == ratelimit.CategoryError { + batch = 1 + } + b := NewBucketedBuffer[tbItem](tt.category, 10, OverflowPolicyDropOldest, batch, tt.timeout) + for i := 0; i < tt.items; i++ { + b.Offer(tbItem{id: i, trace: "t"}) + } + if tt.timeout > 0 { + time.Sleep(tt.timeout + 1*time.Millisecond) + } + ready := b.IsReadyToFlush() + if ready != tt.expect { + t.Fatalf("ready=%v expect=%v", ready, tt.expect) + } + }) + } +} + +func TestBucketedBufferConcurrency(t *testing.T) { + b := NewBucketedBuffer[tbItem](ratelimit.CategoryError, 200, OverflowPolicyDropOldest, 1, 0) + const producers = 5 + const per = 50 + var wg sync.WaitGroup + wg.Add(producers) + for p := 0; p < producers; p++ { + go func(pid int) { + defer wg.Done() + for j := 0; j < per; j++ { + b.Offer(tbItem{id: pid*per + j}) + } + }(p) + } + wg.Wait() + var polled int64 + for { + if _, ok := b.Poll(); !ok { + break + } + atomic.AddInt64(&polled, 1) + } + if polled == 0 { + t.Fatal("expected to poll some items") + } +} diff --git a/internal/telemetry/buffer.go b/internal/telemetry/buffer.go index f63aeac18..7305d1fc8 100644 --- a/internal/telemetry/buffer.go +++ b/internal/telemetry/buffer.go @@ -10,8 +10,8 @@ import ( const defaultCapacity = 100 -// Buffer is a thread-safe ring buffer with overflow policies. -type Buffer[T any] struct { +// RingBuffer is a thread-safe ring buffer with overflow policies. +type RingBuffer[T any] struct { mu sync.RWMutex items []T head int @@ -32,7 +32,7 @@ type Buffer[T any] struct { onDropped func(item T, reason string) } -func NewBuffer[T any](category ratelimit.Category, capacity int, overflowPolicy OverflowPolicy, batchSize int, timeout time.Duration) *Buffer[T] { +func NewRingBuffer[T any](category ratelimit.Category, capacity int, overflowPolicy OverflowPolicy, batchSize int, timeout time.Duration) *RingBuffer[T] { if capacity <= 0 { capacity = defaultCapacity } @@ -45,7 +45,7 @@ func NewBuffer[T any](category ratelimit.Category, capacity int, overflowPolicy timeout = 0 } - return &Buffer[T]{ + return &RingBuffer[T]{ items: make([]T, capacity), capacity: capacity, category: category, @@ -57,13 +57,13 @@ func NewBuffer[T any](category ratelimit.Category, capacity int, overflowPolicy } } -func (b *Buffer[T]) SetDroppedCallback(callback func(item T, reason string)) { +func (b *RingBuffer[T]) SetDroppedCallback(callback func(item T, reason string)) { b.mu.Lock() defer b.mu.Unlock() b.onDropped = callback } -func (b *Buffer[T]) Offer(item T) bool { +func (b *RingBuffer[T]) Offer(item T) bool { atomic.AddInt64(&b.offered, 1) b.mu.Lock() @@ -105,7 +105,7 @@ func (b *Buffer[T]) Offer(item T) bool { } } -func (b *Buffer[T]) Poll() (T, bool) { +func (b *RingBuffer[T]) Poll() (T, bool) { b.mu.Lock() defer b.mu.Unlock() @@ -122,7 +122,7 @@ func (b *Buffer[T]) Poll() (T, bool) { return item, true } -func (b *Buffer[T]) PollBatch(maxItems int) []T { +func (b *RingBuffer[T]) PollBatch(maxItems int) []T { if maxItems <= 0 { return nil } @@ -152,7 +152,7 @@ func (b *Buffer[T]) PollBatch(maxItems int) []T { return result } -func (b *Buffer[T]) Drain() []T { +func (b *RingBuffer[T]) Drain() []T { b.mu.Lock() defer b.mu.Unlock() @@ -178,7 +178,7 @@ func (b *Buffer[T]) Drain() []T { return result } -func (b *Buffer[T]) Peek() (T, bool) { +func (b *RingBuffer[T]) Peek() (T, bool) { b.mu.RLock() defer b.mu.RUnlock() @@ -190,61 +190,61 @@ func (b *Buffer[T]) Peek() (T, bool) { return b.items[b.head], true } -func (b *Buffer[T]) Size() int { +func (b *RingBuffer[T]) Size() int { b.mu.RLock() defer b.mu.RUnlock() return b.size } -func (b *Buffer[T]) Capacity() int { +func (b *RingBuffer[T]) Capacity() int { b.mu.RLock() defer b.mu.RUnlock() return b.capacity } -func (b *Buffer[T]) Category() ratelimit.Category { +func (b *RingBuffer[T]) Category() ratelimit.Category { b.mu.RLock() defer b.mu.RUnlock() return b.category } -func (b *Buffer[T]) Priority() ratelimit.Priority { +func (b *RingBuffer[T]) Priority() ratelimit.Priority { b.mu.RLock() defer b.mu.RUnlock() return b.priority } -func (b *Buffer[T]) IsEmpty() bool { +func (b *RingBuffer[T]) IsEmpty() bool { b.mu.RLock() defer b.mu.RUnlock() return b.size == 0 } -func (b *Buffer[T]) IsFull() bool { +func (b *RingBuffer[T]) IsFull() bool { b.mu.RLock() defer b.mu.RUnlock() return b.size == b.capacity } -func (b *Buffer[T]) Utilization() float64 { +func (b *RingBuffer[T]) Utilization() float64 { b.mu.RLock() defer b.mu.RUnlock() return float64(b.size) / float64(b.capacity) } -func (b *Buffer[T]) OfferedCount() int64 { +func (b *RingBuffer[T]) OfferedCount() int64 { return atomic.LoadInt64(&b.offered) } -func (b *Buffer[T]) DroppedCount() int64 { +func (b *RingBuffer[T]) DroppedCount() int64 { return atomic.LoadInt64(&b.dropped) } -func (b *Buffer[T]) AcceptedCount() int64 { +func (b *RingBuffer[T]) AcceptedCount() int64 { return b.OfferedCount() - b.DroppedCount() } -func (b *Buffer[T]) DropRate() float64 { +func (b *RingBuffer[T]) DropRate() float64 { offered := b.OfferedCount() if offered == 0 { return 0.0 @@ -252,7 +252,7 @@ func (b *Buffer[T]) DropRate() float64 { return float64(b.DroppedCount()) / float64(offered) } -func (b *Buffer[T]) Clear() { +func (b *RingBuffer[T]) Clear() { b.mu.Lock() defer b.mu.Unlock() @@ -266,7 +266,7 @@ func (b *Buffer[T]) Clear() { b.size = 0 } -func (b *Buffer[T]) GetMetrics() BufferMetrics { +func (b *RingBuffer[T]) GetMetrics() BufferMetrics { b.mu.RLock() size := b.size util := float64(b.size) / float64(b.capacity) @@ -286,7 +286,7 @@ func (b *Buffer[T]) GetMetrics() BufferMetrics { } } -func (b *Buffer[T]) IsReadyToFlush() bool { +func (b *RingBuffer[T]) IsReadyToFlush() bool { b.mu.RLock() defer b.mu.RUnlock() @@ -305,13 +305,13 @@ func (b *Buffer[T]) IsReadyToFlush() bool { return false } -func (b *Buffer[T]) MarkFlushed() { +func (b *RingBuffer[T]) MarkFlushed() { b.mu.Lock() defer b.mu.Unlock() b.lastFlushTime = time.Now() } -func (b *Buffer[T]) PollIfReady() []T { +func (b *RingBuffer[T]) PollIfReady() []T { b.mu.Lock() defer b.mu.Unlock() diff --git a/internal/telemetry/buffer_test.go b/internal/telemetry/buffer_test.go index ed6bfc976..afc922310 100644 --- a/internal/telemetry/buffer_test.go +++ b/internal/telemetry/buffer_test.go @@ -14,9 +14,9 @@ type testItem struct { data string } -func TestNewBuffer(t *testing.T) { +func TestNewRingBuffer(t *testing.T) { t.Run("with valid capacity", func(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 50, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 50, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 50 { t.Errorf("Expected capacity 50, got %d", buffer.Capacity()) } @@ -29,14 +29,14 @@ func TestNewBuffer(t *testing.T) { }) t.Run("with zero capacity", func(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryLog, 0, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 100 { t.Errorf("Expected default capacity 100, got %d", buffer.Capacity()) } }) t.Run("with negative capacity", func(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryLog, -10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryLog, -10, OverflowPolicyDropOldest, 1, 0) if buffer.Capacity() != 100 { t.Errorf("Expected default capacity 100, got %d", buffer.Capacity()) } @@ -44,7 +44,7 @@ func TestNewBuffer(t *testing.T) { } func TestBufferBasicOperations(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test empty buffer if !buffer.IsEmpty() { @@ -83,7 +83,7 @@ func TestBufferBasicOperations(t *testing.T) { } func TestBufferPollOperation(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test polling from empty buffer item, ok := buffer.Poll() @@ -126,7 +126,7 @@ func TestBufferPollOperation(t *testing.T) { } func TestBufferOverflow(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -170,7 +170,7 @@ func TestBufferOverflow(t *testing.T) { } func TestBufferDrain(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) // Drain empty buffer items := buffer.Drain() @@ -206,7 +206,7 @@ func TestBufferDrain(t *testing.T) { } func TestBufferMetrics(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Initial metrics if buffer.OfferedCount() != 0 { @@ -230,7 +230,7 @@ func TestBufferMetrics(t *testing.T) { } func TestBufferConcurrency(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 100, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 100, OverflowPolicyDropOldest, 1, 0) const numGoroutines = 10 const itemsPerGoroutine = 50 @@ -301,7 +301,7 @@ func TestBufferDifferentCategories(t *testing.T) { for _, tc := range testCases { t.Run(string(tc.category), func(t *testing.T) { - buffer := NewBuffer[*testItem](tc.category, 10, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](tc.category, 10, OverflowPolicyDropOldest, 1, 0) if buffer.Category() != tc.category { t.Errorf("Expected category %s, got %s", tc.category, buffer.Category()) } @@ -317,7 +317,7 @@ func TestBufferStressTest(t *testing.T) { t.Skip("Skipping stress test in short mode") } - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 1000, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 1000, OverflowPolicyDropOldest, 1, 0) const duration = 100 * time.Millisecond const numProducers = 5 @@ -394,7 +394,7 @@ func TestBufferStressTest(t *testing.T) { } func TestOverflowPolicyDropOldest(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -434,7 +434,7 @@ func TestOverflowPolicyDropOldest(t *testing.T) { } func TestOverflowPolicyDropNewest(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropNewest, 1, 0) // Fill buffer to capacity item1 := &testItem{id: 1, data: "first"} @@ -474,7 +474,7 @@ func TestOverflowPolicyDropNewest(t *testing.T) { } func TestBufferDroppedCallback(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) var droppedItems []*testItem var dropReasons []string @@ -512,7 +512,7 @@ func TestBufferDroppedCallback(t *testing.T) { } func TestBufferPollBatch(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 5, OverflowPolicyDropOldest, 1, 0) // Add some items for i := 1; i <= 5; i++ { @@ -540,7 +540,7 @@ func TestBufferPollBatch(t *testing.T) { } func TestBufferPeek(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Test peek on empty buffer _, ok := buffer.Peek() @@ -567,7 +567,7 @@ func TestBufferPeek(t *testing.T) { } func TestBufferAdvancedMetrics(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 2, OverflowPolicyDropOldest, 1, 0) // Test initial metrics metrics := buffer.GetMetrics() @@ -611,7 +611,7 @@ func TestBufferAdvancedMetrics(t *testing.T) { } func TestBufferClear(t *testing.T) { - buffer := NewBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) + buffer := NewRingBuffer[*testItem](ratelimit.CategoryError, 3, OverflowPolicyDropOldest, 1, 0) // Add some items buffer.Offer(&testItem{id: 1, data: "test"}) @@ -700,7 +700,7 @@ func TestBufferIsReadyToFlush(t *testing.T) { batchSize = 100 timeout = 5 * time.Second } - buffer := NewBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) + buffer := NewRingBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) for i := 0; i < tt.itemsToAdd; i++ { buffer.Offer(&testItem{id: i, data: "test"}) @@ -771,7 +771,7 @@ func TestBufferPollIfReady(t *testing.T) { batchSize = 100 timeout = 5 * time.Second } - buffer := NewBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) + buffer := NewRingBuffer[*testItem](tt.category, 200, OverflowPolicyDropOldest, batchSize, timeout) for i := 0; i < tt.itemsToAdd; i++ { buffer.Offer(&testItem{id: i, data: "test"}) diff --git a/internal/telemetry/buffer_wrapper.go b/internal/telemetry/buffer_wrapper.go new file mode 100644 index 000000000..072d560df --- /dev/null +++ b/internal/telemetry/buffer_wrapper.go @@ -0,0 +1,61 @@ +package telemetry + +import ( + "context" + "time" + + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// Buffer is the top-level buffer that wraps the scheduler and category buffers. +type Buffer struct { + scheduler *Scheduler + storage map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] +} + +// NewBuffer creates a new Buffer with the given configuration. +func NewBuffer( + storage map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible], + transport protocol.TelemetryTransport, + dsn *protocol.Dsn, + sdkInfo *protocol.SdkInfo, +) *Buffer { + scheduler := NewScheduler(storage, transport, dsn, sdkInfo) + scheduler.Start() + + return &Buffer{ + scheduler: scheduler, + storage: storage, + } +} + +// Add adds an EnvelopeItemConvertible to the appropriate buffer based on its category. +func (b *Buffer) Add(item protocol.EnvelopeItemConvertible) bool { + category := item.GetCategory() + buffer, exists := b.storage[category] + if !exists { + return false + } + + accepted := buffer.Offer(item) + if accepted { + b.scheduler.Signal() + } + return accepted +} + +// Flush forces all buffers to flush within the given timeout. +func (b *Buffer) Flush(timeout time.Duration) bool { + return b.scheduler.Flush(timeout) +} + +// FlushWithContext flushes with a custom context for cancellation. +func (b *Buffer) FlushWithContext(ctx context.Context) bool { + return b.scheduler.FlushWithContext(ctx) +} + +// Close stops the buffer, flushes remaining data, and releases resources. +func (b *Buffer) Close(timeout time.Duration) { + b.scheduler.Stop(timeout) +} diff --git a/internal/telemetry/buffer_wrapper_test.go b/internal/telemetry/buffer_wrapper_test.go new file mode 100644 index 000000000..32c5d54f6 --- /dev/null +++ b/internal/telemetry/buffer_wrapper_test.go @@ -0,0 +1,57 @@ +package telemetry + +import ( + "context" + "testing" + + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/internal/testutils" +) + +type bwItem struct{ id string } + +func (b bwItem) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { + return &protocol.EnvelopeItem{Header: &protocol.EnvelopeItemHeader{Type: protocol.EnvelopeItemTypeEvent}, Payload: []byte(`{"message":"ok"}`)}, nil +} +func (b bwItem) GetCategory() ratelimit.Category { return ratelimit.CategoryError } +func (b bwItem) GetEventID() string { return b.id } +func (b bwItem) GetSdkInfo() *protocol.SdkInfo { return &protocol.SdkInfo{Name: "t", Version: "1"} } +func (b bwItem) GetDynamicSamplingContext() map[string]string { return nil } + +func TestBuffer_Add_MissingCategory(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + sdk := &protocol.SdkInfo{Name: "s", Version: "v"} + storage := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{} + + b := NewBuffer(storage, transport, dsn, sdk) + ok := b.Add(bwItem{id: "1"}) + if ok { + t.Fatal("expected Add to return false without storage for category") + } + b.Close(testutils.FlushTimeout()) +} + +func TestBuffer_AddAndFlush_Sends(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + sdk := &protocol.SdkInfo{Name: "s", Version: "v"} + storage := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + } + b := NewBuffer(storage, transport, dsn, sdk) + if !b.Add(bwItem{id: "1"}) { + t.Fatal("add failed") + } + if ok := b.Flush(testutils.FlushTimeout()); !ok { + t.Fatal("flush returned false") + } + if ok := b.FlushWithContext(context.Background()); !ok { + t.Fatal("flush returned false") + } + if transport.GetSendCount() == 0 { + t.Fatal("expected at least one send") + } + b.Close(testutils.FlushTimeout()) +} diff --git a/internal/telemetry/scheduler.go b/internal/telemetry/scheduler.go new file mode 100644 index 000000000..0caaac8c2 --- /dev/null +++ b/internal/telemetry/scheduler.go @@ -0,0 +1,264 @@ +package telemetry + +import ( + "context" + "sync" + "time" + + "github.com/getsentry/sentry-go/internal/debuglog" + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// Scheduler implements a weighted round-robin scheduler for processing buffered events. +type Scheduler struct { + buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] + transport protocol.TelemetryTransport + dsn *protocol.Dsn + sdkInfo *protocol.SdkInfo + + currentCycle []ratelimit.Priority + cyclePos int + + ctx context.Context + cancel context.CancelFunc + processingWg sync.WaitGroup + + mu sync.Mutex + cond *sync.Cond + startOnce sync.Once + finishOnce sync.Once +} + +func NewScheduler( + buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible], + transport protocol.TelemetryTransport, + dsn *protocol.Dsn, + sdkInfo *protocol.SdkInfo, +) *Scheduler { + ctx, cancel := context.WithCancel(context.Background()) + + priorityWeights := map[ratelimit.Priority]int{ + ratelimit.PriorityCritical: 5, + ratelimit.PriorityHigh: 4, + ratelimit.PriorityMedium: 3, + ratelimit.PriorityLow: 2, + ratelimit.PriorityLowest: 1, + } + + var currentCycle []ratelimit.Priority + for priority, weight := range priorityWeights { + hasBuffers := false + for _, buffer := range buffers { + if buffer.Priority() == priority { + hasBuffers = true + break + } + } + + if hasBuffers { + for i := 0; i < weight; i++ { + currentCycle = append(currentCycle, priority) + } + } + } + + s := &Scheduler{ + buffers: buffers, + transport: transport, + dsn: dsn, + sdkInfo: sdkInfo, + currentCycle: currentCycle, + ctx: ctx, + cancel: cancel, + } + s.cond = sync.NewCond(&s.mu) + + return s +} + +func (s *Scheduler) Start() { + s.startOnce.Do(func() { + s.processingWg.Add(1) + go s.run() + }) +} + +func (s *Scheduler) Stop(timeout time.Duration) { + s.finishOnce.Do(func() { + s.Flush(timeout) + + s.cancel() + s.cond.Broadcast() + + done := make(chan struct{}) + go func() { + defer close(done) + s.processingWg.Wait() + }() + + select { + case <-done: + case <-time.After(timeout): + debuglog.Printf("scheduler stop timed out after %v", timeout) + } + }) +} + +func (s *Scheduler) Signal() { + s.cond.Signal() +} + +func (s *Scheduler) Flush(timeout time.Duration) bool { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return s.FlushWithContext(ctx) +} + +func (s *Scheduler) FlushWithContext(ctx context.Context) bool { + s.flushBuffers() + return s.transport.FlushWithContext(ctx) +} + +func (s *Scheduler) run() { + defer s.processingWg.Done() + + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.cond.Broadcast() + case <-s.ctx.Done(): + return + } + } + }() + + for { + s.mu.Lock() + + for !s.hasWork() && s.ctx.Err() == nil { + s.cond.Wait() + } + + if s.ctx.Err() != nil { + s.mu.Unlock() + return + } + + s.mu.Unlock() + s.processNextBatch() + } +} + +func (s *Scheduler) hasWork() bool { + for _, buffer := range s.buffers { + if !buffer.IsEmpty() { + return true + } + } + return false +} + +func (s *Scheduler) processNextBatch() { + if len(s.currentCycle) == 0 { + return + } + + priority := s.currentCycle[s.cyclePos] + s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle) + + var bufferToProcess Storage[protocol.EnvelopeItemConvertible] + var categoryToProcess ratelimit.Category + for category, buffer := range s.buffers { + if buffer.Priority() == priority && !s.isRateLimited(category) && buffer.IsReadyToFlush() { + bufferToProcess = buffer + categoryToProcess = category + break + } + } + + if bufferToProcess != nil { + s.processItems(bufferToProcess, categoryToProcess, false) + } +} + +func (s *Scheduler) processItems(buffer Storage[protocol.EnvelopeItemConvertible], category ratelimit.Category, force bool) { + var items []protocol.EnvelopeItemConvertible + + if force { + items = buffer.Drain() + } else { + items = buffer.PollIfReady() + } + + if len(items) == 0 { + return + } + + switch category { + case ratelimit.CategoryLog: + logs := protocol.Logs(items) + header := &protocol.EnvelopeHeader{EventID: protocol.GenerateEventID(), SentAt: time.Now(), Sdk: s.sdkInfo} + if s.dsn != nil { + header.Dsn = s.dsn.String() + } + envelope := protocol.NewEnvelope(header) + item, err := logs.ToEnvelopeItem() + if err != nil { + debuglog.Printf("error creating log batch envelope item: %v", err) + return + } + envelope.AddItem(item) + if err := s.transport.SendEnvelope(envelope); err != nil { + debuglog.Printf("error sending envelope: %v", err) + } + return + default: + // if the buffers are properly configured, buffer.PollIfReady should return a single item for every category + // other than logs. We still iterate over the items just in case, because we don't want to send broken envelopes. + for _, it := range items { + s.sendItem(it) + } + } +} + +func (s *Scheduler) sendItem(item protocol.EnvelopeItemConvertible) { + header := &protocol.EnvelopeHeader{ + EventID: item.GetEventID(), + SentAt: time.Now(), + Trace: item.GetDynamicSamplingContext(), + Sdk: s.sdkInfo, + } + if header.EventID == "" { + header.EventID = protocol.GenerateEventID() + } + if s.dsn != nil { + header.Dsn = s.dsn.String() + } + envelope := protocol.NewEnvelope(header) + envItem, err := item.ToEnvelopeItem() + if err != nil { + debuglog.Printf("error converting item to envelope: %v", err) + return + } + envelope.AddItem(envItem) + if err := s.transport.SendEnvelope(envelope); err != nil { + debuglog.Printf("error sending envelope: %v", err) + } +} + +func (s *Scheduler) flushBuffers() { + for category, buffer := range s.buffers { + if !buffer.IsEmpty() { + s.processItems(buffer, category, true) + } + } +} + +func (s *Scheduler) isRateLimited(category ratelimit.Category) bool { + return s.transport.IsRateLimited(category) +} diff --git a/internal/telemetry/scheduler_test.go b/internal/telemetry/scheduler_test.go new file mode 100644 index 000000000..71ba9f8e2 --- /dev/null +++ b/internal/telemetry/scheduler_test.go @@ -0,0 +1,288 @@ +package telemetry + +import ( + "testing" + "time" + + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" + "github.com/getsentry/sentry-go/internal/testutils" +) + +type testTelemetryItem struct { + id int + data string + category ratelimit.Category +} + +func (t *testTelemetryItem) ToEnvelopeItem() (*protocol.EnvelopeItem, error) { + var payload string + if t.GetCategory() == ratelimit.CategoryLog { + payload = `{"type": "log", "timestamp": "2023-01-01T00:00:00Z", "logs": [{"level": "info", "body": "` + t.data + `"}]}` + } else { + payload = `{"message": "` + t.data + `"}` + } + + return &protocol.EnvelopeItem{ + Header: &protocol.EnvelopeItemHeader{ + Type: protocol.EnvelopeItemTypeEvent, + }, + Payload: []byte(payload), + }, nil +} + +func (t *testTelemetryItem) GetCategory() ratelimit.Category { + if t.category != "" { + return t.category + } + return ratelimit.CategoryError +} + +func (t *testTelemetryItem) GetEventID() string { + return t.data +} + +func (t *testTelemetryItem) GetSdkInfo() *protocol.SdkInfo { + return &protocol.SdkInfo{ + Name: "test", + Version: "1.0.0", + } +} + +func (t *testTelemetryItem) GetDynamicSamplingContext() map[string]string { + return nil +} + +func TestNewTelemetryScheduler(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + + buffers := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + } + + sdkInfo := &protocol.SdkInfo{ + Name: "test-sdk", + Version: "1.0.0", + } + + scheduler := NewScheduler(buffers, transport, dsn, sdkInfo) + + if scheduler == nil { + t.Fatal("Expected non-nil scheduler") + } + + if len(scheduler.buffers) != 1 { + t.Errorf("Expected 1 buffer, got %d", len(scheduler.buffers)) + } + + if scheduler.dsn != dsn { + t.Error("Expected DSN to be set correctly") + } + + if len(scheduler.currentCycle) == 0 { + t.Error("Expected non-empty priority cycle") + } + + criticalCount := 0 + mediumCount := 0 + for _, priority := range scheduler.currentCycle { + switch priority { + case ratelimit.PriorityCritical: + criticalCount++ + case ratelimit.PriorityMedium: + mediumCount++ + } + } + + if criticalCount <= mediumCount { + t.Errorf("Expected more critical priority slots (%d) than medium (%d)", criticalCount, mediumCount) + } +} + +func TestTelemetrySchedulerFlush(t *testing.T) { + tests := []struct { + name string + setupBuffers func() map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] + addItems func(buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]) + expectedCount int64 + }{ + { + name: "single category with multiple items", + setupBuffers: func() map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] { + return map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + } + }, + addItems: func(buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]) { + for i := 1; i <= 5; i++ { + buffers[ratelimit.CategoryError].Offer(&testTelemetryItem{id: i, data: "test"}) + } + }, + expectedCount: 5, + }, + { + name: "empty buffers", + setupBuffers: func() map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] { + return map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + } + }, + addItems: func(_ map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]) { + }, + expectedCount: 0, + }, + { + name: "multiple categories", + setupBuffers: func() map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] { + return map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryTransaction: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryTransaction, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryMonitor: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryMonitor, 10, OverflowPolicyDropOldest, 1, 0), + } + }, + addItems: func(buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]) { + i := 0 + for category, buffer := range buffers { + buffer.Offer(&testTelemetryItem{id: i + 1, data: string(category), category: category}) + i++ + } + }, + expectedCount: 3, + }, + { + name: "priority ordering - error and log", + setupBuffers: func() map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible] { + return map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0), + ratelimit.CategoryLog: NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryLog, 10, OverflowPolicyDropOldest, 100, 5*time.Second), + } + }, + addItems: func(buffers map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]) { + buffers[ratelimit.CategoryError].Offer(&testTelemetryItem{id: 1, data: "error", category: ratelimit.CategoryError}) + // simulate a log item (will be batched) + buffers[ratelimit.CategoryLog].Offer(&testTelemetryItem{id: 2, data: "log", category: ratelimit.CategoryLog}) + }, + expectedCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + sdkInfo := &protocol.SdkInfo{Name: "test-sdk", Version: "1.0.0"} + + buffers := tt.setupBuffers() + scheduler := NewScheduler(buffers, transport, dsn, sdkInfo) + + tt.addItems(buffers) + + scheduler.Flush(time.Second) + + if transport.GetSendCount() != tt.expectedCount { + t.Errorf("Expected %d items to be processed, got %d", tt.expectedCount, transport.GetSendCount()) + } + + for category, buffer := range buffers { + if !buffer.IsEmpty() { + t.Errorf("Expected buffer %s to be empty after flush", category) + } + } + }) + } +} + +func TestTelemetrySchedulerRateLimiting(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + + buffer := NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffers := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: buffer, + } + // no log buffer used in simplified scheduler tests + sdkInfo := &protocol.SdkInfo{Name: "test-sdk", Version: "1.0.0"} + + scheduler := NewScheduler(buffers, transport, dsn, sdkInfo) + + transport.SetRateLimited("error", true) + + scheduler.Start() + defer scheduler.Stop(100 * time.Millisecond) + + item := &testTelemetryItem{id: 1, data: "test"} + buffer.Offer(item) + scheduler.Signal() + + time.Sleep(200 * time.Millisecond) + + if transport.GetSendCount() > 0 { + t.Errorf("Expected 0 items to be processed due to rate limiting, got %d", transport.GetSendCount()) + } + + if transport.GetRateLimitedCalls() == 0 { + t.Error("Expected rate limit check to be called") + } +} + +func TestTelemetrySchedulerStartStop(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + + buffer := NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffers := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: buffer, + } + // no log buffer used in simplified scheduler tests + sdkInfo := &protocol.SdkInfo{Name: "test-sdk", Version: "1.0.0"} + + scheduler := NewScheduler(buffers, transport, dsn, sdkInfo) + + scheduler.Start() + scheduler.Start() + + item := &testTelemetryItem{id: 1, data: "test"} + buffer.Offer(item) + scheduler.Signal() + + scheduler.Stop(time.Second) + scheduler.Stop(time.Second) + + if transport.GetSendCount() == 0 { + t.Error("Expected at least 1 item to be processed") + } +} + +func TestTelemetrySchedulerContextCancellation(t *testing.T) { + transport := &testutils.MockTelemetryTransport{} + dsn := &protocol.Dsn{} + + buffer := NewRingBuffer[protocol.EnvelopeItemConvertible](ratelimit.CategoryError, 10, OverflowPolicyDropOldest, 1, 0) + buffers := map[ratelimit.Category]Storage[protocol.EnvelopeItemConvertible]{ + ratelimit.CategoryError: buffer, + } + sdkInfo := &protocol.SdkInfo{Name: "test-sdk", Version: "1.0.0"} + + scheduler := NewScheduler(buffers, transport, dsn, sdkInfo) + + scheduler.Start() + + for i := 1; i <= 5; i++ { + item := &testTelemetryItem{id: i, data: "test"} + buffer.Offer(item) + } + scheduler.Signal() + + done := make(chan struct{}) + go func() { + defer close(done) + scheduler.Stop(100 * time.Millisecond) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Error("Scheduler stop took too long") + } +} diff --git a/internal/telemetry/storage.go b/internal/telemetry/storage.go new file mode 100644 index 000000000..5621e333c --- /dev/null +++ b/internal/telemetry/storage.go @@ -0,0 +1,42 @@ +package telemetry + +import ( + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +// Storage defines the common interface for all buffer storage implementations. +type Storage[T any] interface { + // Core operations + Offer(item T) bool + Poll() (T, bool) + PollBatch(maxItems int) []T + PollIfReady() []T + Drain() []T + Peek() (T, bool) + + // State queries + Size() int + Capacity() int + IsEmpty() bool + IsFull() bool + Utilization() float64 + + // Flush management + IsReadyToFlush() bool + MarkFlushed() + + // Category/Priority + Category() ratelimit.Category + Priority() ratelimit.Priority + + // Metrics + OfferedCount() int64 + DroppedCount() int64 + AcceptedCount() int64 + DropRate() float64 + GetMetrics() BufferMetrics + + // Configuration + SetDroppedCallback(callback func(item T, reason string)) + Clear() +} diff --git a/internal/telemetry/trace_aware.go b/internal/telemetry/trace_aware.go new file mode 100644 index 000000000..a32103371 --- /dev/null +++ b/internal/telemetry/trace_aware.go @@ -0,0 +1,7 @@ +package telemetry + +// TraceAware is implemented by items that can expose a trace ID. +// BucketedBuffer uses this to group items by trace. +type TraceAware interface { + GetTraceID() (string, bool) +} diff --git a/internal/testutils/mocks.go b/internal/testutils/mocks.go new file mode 100644 index 000000000..1938cc4be --- /dev/null +++ b/internal/testutils/mocks.go @@ -0,0 +1,93 @@ +package testutils + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/getsentry/sentry-go/internal/protocol" + "github.com/getsentry/sentry-go/internal/ratelimit" +) + +type MockTelemetryTransport struct { + sentEnvelopes []*protocol.Envelope + rateLimited map[string]bool + sendError error + mu sync.Mutex + sendCount int64 + rateLimitedCalls int64 +} + +func (m *MockTelemetryTransport) SendEnvelope(envelope *protocol.Envelope) error { + atomic.AddInt64(&m.sendCount, 1) + m.mu.Lock() + defer m.mu.Unlock() + + if m.sendError != nil { + return m.sendError + } + + m.sentEnvelopes = append(m.sentEnvelopes, envelope) + return nil +} + +func (m *MockTelemetryTransport) IsRateLimited(category ratelimit.Category) bool { + atomic.AddInt64(&m.rateLimitedCalls, 1) + m.mu.Lock() + defer m.mu.Unlock() + + if m.rateLimited == nil { + return false + } + return m.rateLimited[string(category)] +} + +func (m *MockTelemetryTransport) Flush(_ time.Duration) bool { + return true +} + +func (m *MockTelemetryTransport) FlushWithContext(_ context.Context) bool { + return true +} + +func (m *MockTelemetryTransport) Configure(_ interface{}) error { + return nil +} + +func (m *MockTelemetryTransport) Close() { +} + +func (m *MockTelemetryTransport) GetSentEnvelopes() []*protocol.Envelope { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]*protocol.Envelope, len(m.sentEnvelopes)) + copy(result, m.sentEnvelopes) + return result +} + +func (m *MockTelemetryTransport) SetRateLimited(category string, limited bool) { + m.mu.Lock() + defer m.mu.Unlock() + if m.rateLimited == nil { + m.rateLimited = make(map[string]bool) + } + m.rateLimited[category] = limited +} + +func (m *MockTelemetryTransport) GetSendCount() int64 { + return atomic.LoadInt64(&m.sendCount) +} + +func (m *MockTelemetryTransport) GetRateLimitedCalls() int64 { + return atomic.LoadInt64(&m.rateLimitedCalls) +} + +func (m *MockTelemetryTransport) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.sentEnvelopes = nil + m.rateLimited = nil + atomic.StoreInt64(&m.sendCount, 0) + atomic.StoreInt64(&m.rateLimitedCalls, 0) +} diff --git a/log.go b/log.go index 58e52f7b4..1918a9982 100644 --- a/log.go +++ b/log.go @@ -66,7 +66,7 @@ func NewLogger(ctx context.Context) Logger { } client := hub.Client() - if client != nil && client.batchLogger != nil { + if client != nil && client.options.EnableLogs && (client.batchLogger != nil || client.telemetryBuffer != nil) { return &sentryLogger{ ctx: ctx, client: client, @@ -76,11 +76,10 @@ func NewLogger(ctx context.Context) Logger { } debuglog.Println("fallback to noopLogger: enableLogs disabled") - return &noopLogger{} // fallback: does nothing + return &noopLogger{} } func (l *sentryLogger) Write(p []byte) (int, error) { - // Avoid sending double newlines to Sentry msg := strings.TrimRight(string(p), "\n") l.Info().Emit(msg) return len(p), nil @@ -145,8 +144,6 @@ func (l *sentryLogger) log(ctx context.Context, level LogLevel, severity int, me for k, v := range entryAttrs { attrs[k] = v } - - // Set default attributes if release := l.client.options.Release; release != "" { attrs["sentry.release"] = Attribute{Value: release, Type: AttributeString} } @@ -194,7 +191,13 @@ func (l *sentryLogger) log(ctx context.Context, level LogLevel, severity int, me } if log != nil { - l.client.batchLogger.logCh <- *log + if l.client.telemetryBuffer != nil { + if !l.client.telemetryBuffer.Add(log) { + debuglog.Print("Dropping event: log buffer full or category missing") + } + } else if l.client.batchLogger != nil { + l.client.batchLogger.logCh <- *log + } } if l.client.options.Debug { @@ -287,7 +290,7 @@ func (l *sentryLogger) Panic() LogEntry { level: LogLevelFatal, severity: LogSeverityFatal, attributes: make(map[string]Attribute), - shouldPanic: true, // this should panic instead of exit + shouldPanic: true, } } diff --git a/transport.go b/transport.go index d57e15517..487e54c17 100644 --- a/transport.go +++ b/transport.go @@ -788,7 +788,29 @@ func (a *internalAsyncTransportAdapter) Configure(options ClientOptions) { } func (a *internalAsyncTransportAdapter) SendEvent(event *Event) { - a.transport.SendEvent(event) + header := &protocol.EnvelopeHeader{EventID: string(event.EventID), SentAt: time.Now(), Sdk: &protocol.SdkInfo{Name: event.Sdk.Name, Version: event.Sdk.Version}} + if a.dsn != nil { + header.Dsn = a.dsn.String() + } + if header.EventID == "" { + header.EventID = protocol.GenerateEventID() + } + envelope := protocol.NewEnvelope(header) + item, err := event.ToEnvelopeItem() + if err != nil { + debuglog.Printf("Failed to convert event to envelope item: %v", err) + return + } + envelope.AddItem(item) + + for _, attachment := range event.Attachments { + attachmentItem := protocol.NewAttachmentItem(attachment.Filename, attachment.ContentType, attachment.Payload) + envelope.AddItem(attachmentItem) + } + + if err := a.transport.SendEnvelope(envelope); err != nil { + debuglog.Printf("Error sending envelope: %v", err) + } } func (a *internalAsyncTransportAdapter) Flush(timeout time.Duration) bool { diff --git a/util.go b/util.go index 3a6a33c8d..54524304e 100644 --- a/util.go +++ b/util.go @@ -1,8 +1,6 @@ package sentry import ( - "crypto/rand" - "encoding/hex" "encoding/json" "fmt" "os" @@ -11,18 +9,12 @@ import ( "time" "github.com/getsentry/sentry-go/internal/debuglog" + "github.com/getsentry/sentry-go/internal/protocol" exec "golang.org/x/sys/execabs" ) func uuid() string { - id := make([]byte, 16) - // Prefer rand.Read over rand.Reader, see https://go-review.googlesource.com/c/go/+/272326/. - _, _ = rand.Read(id) - id[6] &= 0x0F // clear version - id[6] |= 0x40 // set version to 4 (random uuid) - id[8] &= 0x3F // clear variant - id[8] |= 0x80 // set to IETF variant - return hex.EncodeToString(id) + return protocol.GenerateEventID() } func fileExists(fileName string) bool {