Skip to content

Commit a6f7cdd

Browse files
authored
skip log & metric enrichment (#908)
1 parent 1da3fff commit a6f7cdd

20 files changed

+448
-16
lines changed

processor/elasticapmprocessor/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,13 @@ The processor enriches traces, metrics, and logs with elastic specific requireme
2222
processors:
2323
elasticapm:
2424
```
25+
26+
### Conditionally enrich logs and metrics only for ECS mapping mode
27+
28+
When `skip_enrichment` is set to `true`, logs and metrics are only enriched when the `x-elastic-mapping-mode` metadata is set to `ecs`. Traces are always enriched regardless of this setting. This defaults to `false` for backwards compatibility (always enrich).
29+
30+
```yaml
31+
processors:
32+
elasticapm:
33+
skip_enrichment: true
34+
```

processor/elasticapmprocessor/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,10 @@ import "github.com/elastic/opentelemetry-lib/enrichments/config"
2121

2222
type Config struct {
2323
config.Config `mapstructure:",squash"`
24+
25+
// SkipEnrichment controls whether enrichment should be skipped for logs and metrics
26+
// when the mapping mode is not "ecs". When true, logs and metrics are only enriched when
27+
// the x-elastic-mapping-mode metadata is set to "ecs". Traces are always enriched regardless
28+
// of this setting. Defaults to false for backwards compatibility (always enrich).
29+
SkipEnrichment bool `mapstructure:"skip_enrichment"`
2430
}

processor/elasticapmprocessor/processor.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,18 @@ type LogProcessor struct {
9292
component.StartFunc
9393
component.ShutdownFunc
9494

95-
next consumer.Logs
96-
enricher *enrichments.Enricher
97-
logger *zap.Logger
95+
next consumer.Logs
96+
enricher *enrichments.Enricher
97+
logger *zap.Logger
98+
skipEnrichment bool
9899
}
99100

100101
func newLogProcessor(cfg *Config, next consumer.Logs, logger *zap.Logger) *LogProcessor {
101102
return &LogProcessor{
102-
next: next,
103-
logger: logger,
104-
enricher: enrichments.NewEnricher(cfg.Config),
103+
next: next,
104+
logger: logger,
105+
enricher: enrichments.NewEnricher(cfg.Config),
106+
skipEnrichment: cfg.SkipEnrichment,
105107
}
106108
}
107109

@@ -113,16 +115,18 @@ type MetricProcessor struct {
113115
component.StartFunc
114116
component.ShutdownFunc
115117

116-
next consumer.Metrics
117-
enricher *enrichments.Enricher
118-
logger *zap.Logger
118+
next consumer.Metrics
119+
enricher *enrichments.Enricher
120+
logger *zap.Logger
121+
skipEnrichment bool
119122
}
120123

121124
func newMetricProcessor(cfg *Config, next consumer.Metrics, logger *zap.Logger) *MetricProcessor {
122125
return &MetricProcessor{
123-
next: next,
124-
logger: logger,
125-
enricher: enrichments.NewEnricher(cfg.Config),
126+
next: next,
127+
logger: logger,
128+
enricher: enrichments.NewEnricher(cfg.Config),
129+
skipEnrichment: cfg.SkipEnrichment,
126130
}
127131
}
128132

@@ -131,7 +135,8 @@ func (p *MetricProcessor) Capabilities() consumer.Capabilities {
131135
}
132136

133137
func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
134-
if isECS(ctx) {
138+
ecsMode := isECS(ctx)
139+
if ecsMode {
135140
resourceMetrics := md.ResourceMetrics()
136141
for i := 0; i < resourceMetrics.Len(); i++ {
137142
resourceMetric := resourceMetrics.At(i)
@@ -141,12 +146,17 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics
141146
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
142147
}
143148
}
144-
p.enricher.EnrichMetrics(md)
149+
// When skipEnrichment is true, only enrich when mapping mode is ecs
150+
// When skipEnrichment is false (default), always enrich (backwards compatible)
151+
if !p.skipEnrichment || ecsMode {
152+
p.enricher.EnrichMetrics(md)
153+
}
145154
return p.next.ConsumeMetrics(ctx, md)
146155
}
147156

148157
func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
149-
if isECS(ctx) {
158+
ecsMode := isECS(ctx)
159+
if ecsMode {
150160
resourceLogs := ld.ResourceLogs()
151161
for i := 0; i < resourceLogs.Len(); i++ {
152162
resourceLog := resourceLogs.At(i)
@@ -157,6 +167,10 @@ func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
157167
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
158168
}
159169
}
160-
p.enricher.EnrichLogs(ld)
170+
// When skipEnrichment is true, only enrich when mapping mode is ecs
171+
// When skipEnrichment is false (default), always enrich (backwards compatible)
172+
if !p.skipEnrichment || ecsMode {
173+
p.enricher.EnrichLogs(ld)
174+
}
161175
return p.next.ConsumeLogs(ctx, ld)
162176
}

processor/elasticapmprocessor/processor_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"testing"
2525

2626
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2729
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
2830
"github.com/stretchr/testify/assert"
2931
"github.com/stretchr/testify/require"
@@ -118,3 +120,137 @@ func TestProcessorECS(t *testing.T) {
118120
}
119121
assert.NoError(t, ptracetest.CompareTraces(expectedTraces, actual))
120122
}
123+
124+
// TestSkipEnrichmentLogs tests that logs are only enriched when skipEnrichment is false or when mapping mode is ecs
125+
func TestSkipEnrichmentLogs(t *testing.T) {
126+
testCases := []struct {
127+
name string
128+
skipEnrichment bool
129+
mappingMode string
130+
}{
131+
{
132+
name: "logs_false",
133+
skipEnrichment: false,
134+
mappingMode: "",
135+
},
136+
{
137+
name: "logs_false_ecs",
138+
skipEnrichment: false,
139+
mappingMode: "ecs",
140+
},
141+
{
142+
name: "logs_true_ecs",
143+
skipEnrichment: true,
144+
mappingMode: "ecs",
145+
},
146+
{
147+
name: "logs_true_no_ecs",
148+
skipEnrichment: true,
149+
mappingMode: "",
150+
},
151+
}
152+
153+
for _, tc := range testCases {
154+
t.Run(tc.name, func(t *testing.T) {
155+
ctx := context.Background()
156+
if tc.mappingMode != "" {
157+
ctx = client.NewContext(ctx, client.Info{
158+
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {tc.mappingMode}}),
159+
})
160+
}
161+
162+
factory := NewFactory()
163+
settings := processortest.NewNopSettings(metadata.Type)
164+
settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel))
165+
next := &consumertest.LogsSink{}
166+
167+
cfg := createDefaultConfig().(*Config)
168+
cfg.SkipEnrichment = tc.skipEnrichment
169+
170+
lp, err := factory.CreateLogs(ctx, settings, cfg, next)
171+
require.NoError(t, err)
172+
173+
dir := filepath.Join("testdata", "skip_enrichment")
174+
inputLogs, err := golden.ReadLogs(filepath.Join(dir, tc.name+"_input.yaml"))
175+
require.NoError(t, err)
176+
177+
outputFile := filepath.Join(dir, tc.name+"_output.yaml")
178+
require.NoError(t, lp.ConsumeLogs(ctx, inputLogs))
179+
actual := next.AllLogs()[0]
180+
if *update {
181+
err := golden.WriteLogs(t, outputFile, actual)
182+
assert.NoError(t, err)
183+
}
184+
expectedLogs, err := golden.ReadLogs(outputFile)
185+
require.NoError(t, err)
186+
assert.NoError(t, plogtest.CompareLogs(expectedLogs, actual))
187+
})
188+
}
189+
}
190+
191+
// TestSkipEnrichmentMetrics tests that metrics are only enriched when skipEnrichment is false or when mapping mode is ecs
192+
func TestSkipEnrichmentMetrics(t *testing.T) {
193+
testCases := []struct {
194+
name string
195+
skipEnrichment bool
196+
mappingMode string
197+
}{
198+
{
199+
name: "metrics_false",
200+
skipEnrichment: false,
201+
mappingMode: "",
202+
},
203+
{
204+
name: "metrics_false_ecs",
205+
skipEnrichment: false,
206+
mappingMode: "ecs",
207+
},
208+
{
209+
name: "metrics_true_ecs",
210+
skipEnrichment: true,
211+
mappingMode: "ecs",
212+
},
213+
{
214+
name: "metrics_true_no_ecs",
215+
skipEnrichment: true,
216+
mappingMode: "",
217+
},
218+
}
219+
220+
for _, tc := range testCases {
221+
t.Run(tc.name, func(t *testing.T) {
222+
ctx := context.Background()
223+
if tc.mappingMode != "" {
224+
ctx = client.NewContext(ctx, client.Info{
225+
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {tc.mappingMode}}),
226+
})
227+
}
228+
229+
factory := NewFactory()
230+
settings := processortest.NewNopSettings(metadata.Type)
231+
settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel))
232+
next := &consumertest.MetricsSink{}
233+
234+
cfg := createDefaultConfig().(*Config)
235+
cfg.SkipEnrichment = tc.skipEnrichment
236+
237+
mp, err := factory.CreateMetrics(ctx, settings, cfg, next)
238+
require.NoError(t, err)
239+
240+
dir := filepath.Join("testdata", "skip_enrichment")
241+
inputMetrics, err := golden.ReadMetrics(filepath.Join(dir, tc.name+"_input.yaml"))
242+
require.NoError(t, err)
243+
244+
outputFile := filepath.Join(dir, tc.name+"_output.yaml")
245+
require.NoError(t, mp.ConsumeMetrics(ctx, inputMetrics))
246+
actual := next.AllMetrics()[0]
247+
if *update {
248+
err := golden.WriteMetrics(t, outputFile, actual)
249+
assert.NoError(t, err)
250+
}
251+
expectedMetrics, err := golden.ReadMetrics(outputFile)
252+
require.NoError(t, err)
253+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actual, pmetrictest.IgnoreMetricsOrder(), pmetrictest.IgnoreResourceMetricsOrder()))
254+
})
255+
}
256+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
scopeLogs:
8+
- logRecords:
9+
- body:
10+
stringValue: test log message
11+
timeUnixNano: "0"
12+
scope: {}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
- key: data_stream.type
8+
value:
9+
stringValue: logs
10+
- key: data_stream.dataset
11+
value:
12+
stringValue: apm
13+
- key: data_stream.namespace
14+
value:
15+
stringValue: default
16+
- key: agent.name
17+
value:
18+
stringValue: otlp
19+
scopeLogs:
20+
- logRecords:
21+
- body:
22+
stringValue: test log message
23+
scope: {}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
scopeLogs:
8+
- logRecords:
9+
- body:
10+
stringValue: test log message
11+
timeUnixNano: "0"
12+
scope: {}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
- key: agent.name
8+
value:
9+
stringValue: otlp
10+
- key: agent.version
11+
value:
12+
stringValue: unknown
13+
scopeLogs:
14+
- logRecords:
15+
- body:
16+
stringValue: test log message
17+
scope: {}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
scopeLogs:
8+
- logRecords:
9+
- body:
10+
stringValue: test log message
11+
timeUnixNano: "0"
12+
scope: {}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
- key: data_stream.type
8+
value:
9+
stringValue: logs
10+
- key: data_stream.dataset
11+
value:
12+
stringValue: apm
13+
- key: data_stream.namespace
14+
value:
15+
stringValue: default
16+
- key: agent.name
17+
value:
18+
stringValue: otlp
19+
scopeLogs:
20+
- logRecords:
21+
- body:
22+
stringValue: test log message
23+
scope: {}

0 commit comments

Comments
 (0)