From 43e540d7bf6331c2e31dab1c9e1ad2bb72e4fe04 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 08:55:23 +0530 Subject: [PATCH 1/8] fix: fix race conditions in the sensor.go file --- sensor.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sensor.go b/sensor.go index 4e2084293..5e9d92923 100644 --- a/sensor.go +++ b/sensor.go @@ -265,6 +265,9 @@ func StartMetrics(options *Options) { // Ready returns whether the Instana collector is ready to collect and send data to the agent func Ready() bool { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return false } @@ -276,6 +279,9 @@ func Ready() bool { // graceful service shutdown and not recommended for intermittent use. Once Flush() is called, it's not guaranteed // that collector remains in operational state. func Flush(ctx context.Context) error { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return nil } From 0c47efce1723d3d14ac40b6796f95f6b6ce0aa0f Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 08:55:34 +0530 Subject: [PATCH 2/8] fix: fix race conditions in the recorder.go file --- recorder.go | 58 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/recorder.go b/recorder.go index 6f294d6e8..19627a6c7 100644 --- a/recorder.go +++ b/recorder.go @@ -32,16 +32,21 @@ type Recorder struct { func NewRecorder() *Recorder { r := &Recorder{} - ticker := time.NewTicker(1 * time.Second) + // Create a reference to r that will be captured by the goroutine + recorder := r + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() // Ensure ticker is stopped when goroutine exits + for range ticker.C { if isAgentReady() { - go func() { + go func(*Recorder) { if err := r.Flush(context.Background()); err != nil { sensor.logger.Error("failed to flush the spans: ", err.Error()) } - }() + }(r) } } }() @@ -60,32 +65,51 @@ func NewTestRecorder() *Recorder { // RecordSpan accepts spans to be recorded and added to the span queue // for eventual reporting to the host agent. func (r *Recorder) RecordSpan(span *spanS) { + // Get all sensor-related values under a single lock to minimize contention + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return + } + + agentReady := sensor.Agent().Ready() + maxBufferedSpans := sensor.options.MaxBufferedSpans + forceTransmissionAt := sensor.options.ForceTransmissionStartingAt + logger := sensor.logger + muSensor.Unlock() + // If we're not announced and not in test mode then just // return - if !r.testMode && !sensor.Agent().Ready() { + if !r.testMode && !agentReady { return } r.Lock() defer r.Unlock() - if len(r.spans) == sensor.options.MaxBufferedSpans { + if len(r.spans) == maxBufferedSpans { r.spans = r.spans[1:] } r.spans = append(r.spans, newSpan(span)) - if r.testMode || !sensor.Agent().Ready() { + if r.testMode || !agentReady { return } - if len(r.spans) >= sensor.options.ForceTransmissionStartingAt { - sensor.logger.Debug("forcing ", len(r.spans), "span(s) to the agent") - go func() { - if err := r.Flush(context.Background()); err != nil { - sensor.logger.Error("failed to flush the spans: ", err.Error()) + if len(r.spans) >= forceTransmissionAt { + logger.Debug("forcing ", len(r.spans), "span(s) to the agent") + // Create a reference to r for this goroutine to avoid race conditions + rec := r + go func(recorder *Recorder) { + if err := recorder.Flush(context.Background()); err != nil { + muSensor.Lock() + if sensor != nil { + sensor.logger.Error("failed to flush the spans: ", err.Error()) + } + muSensor.Unlock() } - }() + }(rec) } } @@ -119,7 +143,15 @@ func (r *Recorder) Flush(ctx context.Context) error { return nil } - if err := sensor.Agent().SendSpans(spansToSend); err != nil { + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return nil + } + agent := sensor.Agent() + muSensor.Unlock() + + if err := agent.SendSpans(spansToSend); err != nil { r.Lock() defer r.Unlock() From 19307ccd7467d64cde6ae609ced65844a501e94a Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 13 Oct 2025 12:27:27 +0530 Subject: [PATCH 3/8] fix: fix race conditions in the agent.go file --- agent.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/agent.go b/agent.go index 5fef356b6..f004a06c4 100644 --- a/agent.go +++ b/agent.go @@ -144,6 +144,9 @@ func (agent *agentS) Ready() bool { // SendMetrics sends collected entity data to the host agent func (agent *agentS) SendMetrics(data acceptor.Metrics) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + pid, err := strconv.Atoi(agent.agentComm.from.EntityID) if err != nil && agent.agentComm.from.EntityID != "" { agent.logger.Debug("agent got malformed PID %q", agent.agentComm.from.EntityID) @@ -159,7 +162,11 @@ func (agent *agentS) SendMetrics(data acceptor.Metrics) error { } agent.logger.Error("failed to send metrics to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } @@ -186,6 +193,9 @@ func (agent *agentS) SendEvent(event *EventData) error { // SendSpans sends collected spans to the host agent func (agent *agentS) SendSpans(spans []Span) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + for i := range spans { spans[i].From = agent.agentComm.from } @@ -202,7 +212,11 @@ func (agent *agentS) SendSpans(spans []Span) error { return nil } else { agent.logger.Error("failed to send spans to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() } return err @@ -221,6 +235,9 @@ type hostAgentProfile struct { // SendProfiles sends profile data to the agent func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + agentProfiles := make([]hostAgentProfile, 0, len(profiles)) for _, p := range profiles { agentProfiles = append(agentProfiles, hostAgentProfile{p, agent.agentComm.from.EntityID}) @@ -233,7 +250,11 @@ func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { } agent.logger.Error("failed to send profile data to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } From aa37a99fa9759cc583e8a8e2488bf2cbb474124f Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Tue, 14 Oct 2025 09:10:32 +0530 Subject: [PATCH 4/8] fix: fix unit test failure --- delayed_spans.go | 7 ++++++- delayed_spans_test.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/delayed_spans.go b/delayed_spans.go index 3e3080dea..9e37f135f 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -42,7 +42,12 @@ func (ds *delayedSpans) flush() { continue } - if sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { s.tracer.recorder.RecordSpan(s) } else { ds.append(s) diff --git a/delayed_spans_test.go b/delayed_spans_test.go index fa5f463bb..b36031aa8 100644 --- a/delayed_spans_test.go +++ b/delayed_spans_test.go @@ -56,7 +56,7 @@ func TestPartiallyFlushDelayedSpans(t *testing.T) { notReadyAfter := maxDelayedSpans / 10 sensor.agent = &eventuallyNotReadyClient{ - notReadyAfter: uint64(notReadyAfter), + notReadyAfter: uint64(notReadyAfter * 2), } delayed.flush() From 68aa7267d981c5898161e355ca29325db5bd2ef3 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Tue, 14 Oct 2025 10:41:13 +0530 Subject: [PATCH 5/8] fix: fix race conditions --- delayed_spans.go | 12 ++++++++++-- event.go | 6 +++++- recorder.go | 2 +- sensor.go | 14 ++++++++++++-- span.go | 7 ++++++- tracer.go | 7 +++++++ 6 files changed, 41 insertions(+), 7 deletions(-) diff --git a/delayed_spans.go b/delayed_spans.go index 9e37f135f..acdbfa902 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -33,12 +33,20 @@ func (ds *delayedSpans) flush() { case s := <-ds.spans: t, ok := s.Tracer().(Tracer) if !ok { - sensor.logger.Debug("span tracer has unexpected type") + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("span tracer has unexpected type") + } + muSensor.Unlock() continue } if err := ds.processSpan(s, t.Options()); err != nil { - sensor.logger.Debug("error while processing spans:", err.Error()) + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("error while processing spans:", err.Error()) + } + muSensor.Unlock() continue } diff --git a/event.go b/event.go index 1ae79ccc6..5877a30d1 100644 --- a/event.go +++ b/event.go @@ -80,6 +80,10 @@ func sendEvent(event *EventData) { // we do fire & forget here, because the whole pid dance isn't necessary to send events go func() { - _ = safeSensor().Agent().SendEvent(event) + muSensor.Lock() + if sensor != nil { + _ = sensor.Agent().SendEvent(event) + } + muSensor.Unlock() }() } diff --git a/recorder.go b/recorder.go index 19627a6c7..9bfee1d40 100644 --- a/recorder.go +++ b/recorder.go @@ -72,7 +72,7 @@ func (r *Recorder) RecordSpan(span *spanS) { return } - agentReady := sensor.Agent().Ready() + agentReady := sensor != nil && sensor.Agent().Ready() maxBufferedSpans := sensor.options.MaxBufferedSpans forceTransmissionAt := sensor.options.ForceTransmissionStartingAt logger := sensor.logger diff --git a/sensor.go b/sensor.go index 5e9d92923..2b36214c6 100644 --- a/sensor.go +++ b/sensor.go @@ -233,13 +233,23 @@ func InitSensor(options *Options) { }) autoprofile.SetSendProfilesFunc(func(profiles []autoprofile.Profile) error { - if !sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if !agentReady { return errors.New("sender not ready") } sensor.logger.Debug("sending profiles to agent") - return sensor.Agent().SendProfiles(profiles) + // Use the same lock for sending profiles + muSensor.Lock() + err := sensor.Agent().SendProfiles(profiles) + muSensor.Unlock() + + return err }) if _, ok := os.LookupEnv("INSTANA_AUTO_PROFILE"); ok || options.EnableAutoProfile { diff --git a/span.go b/span.go index f3faee6d5..45797cc61 100644 --- a/span.go +++ b/span.go @@ -86,7 +86,12 @@ func (r *spanS) FinishWithOptions(opts ot.FinishOptions) { r.Duration = duration if r.sendSpanToAgent() { - if sensor.Agent().Ready() { + // Get agent ready status under proper synchronization + muSensor.Lock() + agentReady := sensor != nil && sensor.Agent().Ready() + muSensor.Unlock() + + if agentReady { r.tracer.recorder.RecordSpan(r) } else { delayed.append(r) diff --git a/tracer.go b/tracer.go index 2f14f7d74..9dc1bfda2 100644 --- a/tracer.go +++ b/tracer.go @@ -135,5 +135,12 @@ func (r *tracerS) Flush(ctx context.Context) error { return err } + muSensor.Lock() + defer muSensor.Unlock() + + if sensor == nil { + return nil + } + return sensor.Agent().Flush(ctx) } From 775b4ac514c7461325ac879214280b245ebf9831 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Wed, 15 Oct 2025 14:44:49 +0530 Subject: [PATCH 6/8] chore: added extra unit test cases for recorder.go --- recorder.go | 17 ++++- recorder_test.go | 157 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/recorder.go b/recorder.go index 9bfee1d40..70a12515c 100644 --- a/recorder.go +++ b/recorder.go @@ -138,19 +138,32 @@ func (r *Recorder) GetQueuedSpans() []Span { // Flush sends queued spans to the agent func (r *Recorder) Flush(ctx context.Context) error { - spansToSend := r.GetQueuedSpans() - if len(spansToSend) == 0 { + // For test mode, we don't want to actually send spans + if r.testMode { return nil } + // Check if agent is ready before getting and clearing spans muSensor.Lock() if sensor == nil { muSensor.Unlock() return nil } + agent := sensor.Agent() + agentReady := agent.Ready() muSensor.Unlock() + // If agent is not ready, don't flush spans + if !agentReady { + return nil + } + + spansToSend := r.GetQueuedSpans() + if len(spansToSend) == 0 { + return nil + } + if err := agent.SendSpans(spansToSend); err != nil { r.Lock() defer r.Unlock() diff --git a/recorder_test.go b/recorder_test.go index 7dad4ad79..6e14c98ec 100644 --- a/recorder_test.go +++ b/recorder_test.go @@ -4,9 +4,14 @@ package instana_test import ( + "context" + "fmt" "testing" + "time" instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/acceptor" + "github.com/instana/go-sensor/autoprofile" ot "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" @@ -68,3 +73,155 @@ func TestRecorder_BatchSpan_Single(t *testing.T) { assert.Nil(t, spans[0].Batch) } + +func TestRecorder_Flush_EmptyQueue(t *testing.T) { + recorder := instana.NewTestRecorder() + + // Test flushing an empty queue + err := recorder.Flush(context.Background()) + assert.NoError(t, err) +} + +func TestRecorder_MaxBufferedSpans(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + MaxBufferedSpans: 3, // Set a small buffer size for testing + }) + defer instana.ShutdownCollector() + + // Create more spans than the buffer can hold + for i := 0; i < 5; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Verify that only the most recent MaxBufferedSpans are kept + spans := recorder.GetQueuedSpans() + assert.Len(t, spans, 3) + + // Verify that only the most recent MaxBufferedSpans are kept + assert.Len(t, spans, 3) +} + +func TestRecorder_ForceTransmission(t *testing.T) { + // Create a mock agent client that tracks when spans are sent + mockAgent := &mockAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + MaxBufferedSpans: 10, + ForceTransmissionStartingAt: 2, // Force transmission after 2 spans + }) + defer instana.ShutdownCollector() + + // Create spans to trigger force transmission + for i := 0; i < 2; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Give some time for the async flush to happen + time.Sleep(100 * time.Millisecond) + + // Verify that SendSpans was called + assert.True(t, mockAgent.spansSent, "Expected spans to be sent to the agent") +} + +// Mock agent client for testing +type mockAgentClient struct { + ready bool + spansSent bool +} + +func (m *mockAgentClient) Ready() bool { return m.ready } +func (m *mockAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *mockAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *mockAgentClient) SendSpans(spans []instana.Span) error { + m.spansSent = true + return nil +} +func (m *mockAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *mockAgentClient) Flush(context.Context) error { return nil } + +// alwaysReadyClient is already defined in instrumentation_http_test.go + +func TestRecorder_Flush_Error(t *testing.T) { + // Create a mock agent client that returns an error on SendSpans + mockAgent := &errorAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Flush should return an error + err := recorder.Flush(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to send collected spans") + + // Verify that spans are put back in the queue + assert.Greater(t, recorder.QueuedSpansCount(), 0) +} + +// Mock agent client that returns an error on SendSpans +type errorAgentClient struct { + ready bool +} + +func (m *errorAgentClient) Ready() bool { return m.ready } +func (m *errorAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *errorAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *errorAgentClient) SendSpans(spans []instana.Span) error { return fmt.Errorf("mock error") } +func (m *errorAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *errorAgentClient) Flush(context.Context) error { return nil } + +// TestRecorder_Flush_AgentNotReady tests the behavior when the agent is not ready +func TestRecorder_Flush_AgentNotReady(t *testing.T) { + // Create a mock agent client that is not ready + mockAgent := ¬ReadyAgentClient{} + + // Use a regular recorder, not a test recorder + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Wait a bit for the span to be processed + time.Sleep(100 * time.Millisecond) + + // Get the initial count + initialCount := recorder.QueuedSpansCount() + + // Flush should not return an error when agent is not ready + err := recorder.Flush(context.Background()) + assert.NoError(t, err) + + // Spans should still be in the queue when agent is not ready + assert.Equal(t, initialCount, recorder.QueuedSpansCount(), "Spans should remain in queue when agent is not ready") +} + +// Mock agent client that is never ready +type notReadyAgentClient struct{} + +func (notReadyAgentClient) Ready() bool { return false } +func (notReadyAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (notReadyAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (notReadyAgentClient) SendSpans(spans []instana.Span) error { return nil } +func (notReadyAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (notReadyAgentClient) Flush(context.Context) error { return nil } From a4a15aa2873c09701a97ad128f3496d42eb6cef2 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 27 Oct 2025 13:14:44 +0530 Subject: [PATCH 7/8] fix: fix race conditions - recorder.go --- recorder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/recorder.go b/recorder.go index 70a12515c..32397bca1 100644 --- a/recorder.go +++ b/recorder.go @@ -33,7 +33,6 @@ func NewRecorder() *Recorder { r := &Recorder{} // Create a reference to r that will be captured by the goroutine - recorder := r go func() { ticker := time.NewTicker(1 * time.Second) From 795acb8d95748d0ff279e7e796fe6607e8eaea95 Mon Sep 17 00:00:00 2001 From: Nithin Puthenveettil Date: Mon, 27 Oct 2025 14:28:24 +0530 Subject: [PATCH 8/8] fix: fix race conditions - code cleanup --- delayed_spans.go | 7 +------ sensor.go | 6 +----- span.go | 6 +----- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/delayed_spans.go b/delayed_spans.go index acdbfa902..80d1d3e03 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -50,12 +50,7 @@ func (ds *delayedSpans) flush() { continue } - // Get agent ready status under proper synchronization - muSensor.Lock() - agentReady := sensor != nil && sensor.Agent().Ready() - muSensor.Unlock() - - if agentReady { + if isAgentReady() { s.tracer.recorder.RecordSpan(s) } else { ds.append(s) diff --git a/sensor.go b/sensor.go index 2b36214c6..586cf479a 100644 --- a/sensor.go +++ b/sensor.go @@ -233,12 +233,8 @@ func InitSensor(options *Options) { }) autoprofile.SetSendProfilesFunc(func(profiles []autoprofile.Profile) error { - // Get agent ready status under proper synchronization - muSensor.Lock() - agentReady := sensor != nil && sensor.Agent().Ready() - muSensor.Unlock() - if !agentReady { + if !isAgentReady() { return errors.New("sender not ready") } diff --git a/span.go b/span.go index 45797cc61..db50e8163 100644 --- a/span.go +++ b/span.go @@ -86,12 +86,8 @@ func (r *spanS) FinishWithOptions(opts ot.FinishOptions) { r.Duration = duration if r.sendSpanToAgent() { - // Get agent ready status under proper synchronization - muSensor.Lock() - agentReady := sensor != nil && sensor.Agent().Ready() - muSensor.Unlock() - if agentReady { + if isAgentReady() { r.tracer.recorder.RecordSpan(r) } else { delayed.append(r)