Skip to content

Commit f9d2bdb

Browse files
authored
stats/otel: Add grpc.lb.backend_service label to wrr metrics (A89) (#8737)
Address: https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md This PR makes available the `backend_service` (cluster name) label which is decided in clusterimpl (since we are pre-A75). It is added to WRR per-call metrics. RELEASE NOTES: * stats/otel: add backend service label to wrr metrics as part of A89
1 parent 647162c commit f9d2bdb

File tree

5 files changed

+46
-18
lines changed

5 files changed

+46
-18
lines changed

balancer/weightedroundrobin/balancer.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var (
6262
Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
6363
Unit: "{update}",
6464
Labels: []string{"grpc.target"},
65-
OptionalLabels: []string{"grpc.lb.locality"},
65+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
6666
Default: false,
6767
})
6868

@@ -71,7 +71,7 @@ var (
7171
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
7272
Unit: "{endpoint}",
7373
Labels: []string{"grpc.target"},
74-
OptionalLabels: []string{"grpc.lb.locality"},
74+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
7575
Default: false,
7676
})
7777

@@ -80,15 +80,15 @@ var (
8080
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
8181
Unit: "{endpoint}",
8282
Labels: []string{"grpc.target"},
83-
OptionalLabels: []string{"grpc.lb.locality"},
83+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
8484
Default: false,
8585
})
8686
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
8787
Name: "grpc.lb.wrr.endpoint_weights",
8888
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
8989
Unit: "{endpoint}",
9090
Labels: []string{"grpc.target"},
91-
OptionalLabels: []string{"grpc.lb.locality"},
91+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
9292
Default: false,
9393
})
9494
)
@@ -173,6 +173,7 @@ func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) {
173173
metricsRecorder: b.metricsRecorder,
174174
target: b.target,
175175
locality: b.locality,
176+
clusterName: b.clusterName,
176177
}
177178
for _, addr := range endpoint.Addresses {
178179
b.addressWeights.Set(addr, ew)
@@ -211,6 +212,7 @@ type wrrBalancer struct {
211212
mu sync.Mutex
212213
cfg *lbConfig // active config
213214
locality string
215+
clusterName string
214216
stopPicker *grpcsync.Event
215217
addressWeights *resolver.AddressMapV2[*endpointWeight]
216218
endpointToWeight *resolver.EndpointMap[*endpointWeight]
@@ -231,6 +233,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
231233
b.mu.Lock()
232234
b.cfg = cfg
233235
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
236+
b.clusterName = backendServiceFromState(ccs.ResolverState)
234237
b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
235238
b.mu.Unlock()
236239

@@ -288,6 +291,7 @@ func (b *wrrBalancer) UpdateState(state balancer.State) {
288291
metricsRecorder: b.metricsRecorder,
289292
locality: b.locality,
290293
target: b.target,
294+
clusterName: b.clusterName,
291295
}
292296

293297
b.stopPicker = grpcsync.NewEvent()
@@ -420,6 +424,7 @@ type picker struct {
420424
// The following fields are immutable.
421425
target string
422426
locality string
427+
clusterName string
423428
metricsRecorder estats.MetricsRecorder
424429
}
425430

@@ -499,6 +504,7 @@ type endpointWeight struct {
499504
target string
500505
metricsRecorder estats.MetricsRecorder
501506
locality string
507+
clusterName string
502508

503509
// The following fields are only accessed on calls into the LB policy, and
504510
// do not need a mutex.
@@ -602,14 +608,14 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
602608

603609
if recordMetrics {
604610
defer func() {
605-
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
611+
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.clusterName)
606612
}()
607613
}
608614

609615
// The endpoint has not received a load report (i.e. just turned READY with
610616
// no load report).
611617
if w.lastUpdated.Equal(time.Time{}) {
612-
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
618+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
613619
return 0
614620
}
615621

@@ -618,7 +624,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
618624
// start getting data again in the future, and return 0.
619625
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
620626
if recordMetrics {
621-
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
627+
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
622628
}
623629
w.nonEmptySince = time.Time{}
624630
return 0
@@ -627,10 +633,27 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
627633
// If we don't have at least blackoutPeriod worth of data, return 0.
628634
if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
629635
if recordMetrics {
630-
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
636+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.clusterName)
631637
}
632638
return 0
633639
}
634640

635641
return w.weightVal
636642
}
643+
644+
type backendServiceKey struct{}
645+
646+
// SetBackendService stores the backendService on the resolver state so
647+
// that it can be used later as a label in wrr metrics.
648+
func SetBackendService(state resolver.State, backendService string) resolver.State {
649+
state.Attributes = state.Attributes.WithValue(backendServiceKey{}, backendService)
650+
return state
651+
}
652+
653+
// getBackendServiceFromState retrieves the cluster name stored as an attribute
654+
// in the resolver state.
655+
func backendServiceFromState(state resolver.State) string {
656+
v := state.Attributes.Value(backendServiceKey{})
657+
name, _ := v.(string)
658+
return name
659+
}

balancer/weightedroundrobin/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
3939
}
4040
if n == 1 {
4141
if recordMetrics {
42-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
42+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
4343
}
4444
return &rrScheduler{numSCs: 1, inc: p.inc}
4545
}
@@ -58,7 +58,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
5858

5959
if numZero >= n-1 {
6060
if recordMetrics {
61-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
61+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
6262
}
6363
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
6464
}

internal/xds/balancer/clusterimpl/clusterimpl.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"google.golang.org/grpc/balancer"
36+
"google.golang.org/grpc/balancer/weightedroundrobin"
3637
"google.golang.org/grpc/connectivity"
3738
"google.golang.org/grpc/internal"
3839
"google.golang.org/grpc/internal/balancer/gracefulswitch"
@@ -297,7 +298,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
297298

298299
// Addresses and sub-balancer config are sent to sub-balancer.
299300
err = b.child.UpdateClientConnState(balancer.ClientConnState{
300-
ResolverState: s.ResolverState,
301+
ResolverState: weightedroundrobin.SetBackendService(s.ResolverState, b.clusterName),
301302
BalancerConfig: parsedCfg,
302303
})
303304

stats/opentelemetry/csm/observability_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,8 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re
434434
"csm.service_name": "service_name_val",
435435
"csm.service_namespace_name": "service_namespace_val",
436436

437-
"grpc.lb.locality": "grpc.lb.locality_val",
437+
"grpc.lb.locality": "grpc.lb.locality_val",
438+
"grpc.lb.backend_service": "grpc.lb.backend_service_val",
438439
},
439440
})
440441

@@ -469,7 +470,7 @@ func (s) TestXDSLabels(t *testing.T) {
469470
MetricsOptions: opentelemetry.MetricsOptions{
470471
MeterProvider: provider,
471472
Metrics: opentelemetry.DefaultMetrics(),
472-
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
473+
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality", "grpc.lb.backend_service"},
473474
},
474475
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
475476
if err := ss.Start(nil, dopts...); err != nil {
@@ -498,6 +499,7 @@ func (s) TestXDSLabels(t *testing.T) {
498499
serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
499500
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
500501
localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
502+
backendServiceAttr := attribute.String("grpc.lb.backend_service", "grpc.lb.backend_service_val")
501503
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
502504
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
503505
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
@@ -510,6 +512,7 @@ func (s) TestXDSLabels(t *testing.T) {
510512
serviceNameAttr,
511513
serviceNamespaceAttr,
512514
localityAttr,
515+
backendServiceAttr,
513516
meshIDAttr,
514517
workloadCanonicalServiceAttr,
515518
remoteWorkloadTypeAttr,

stats/opentelemetry/e2e_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ func (s) TestWRRMetrics(t *testing.T) {
673673
mo := opentelemetry.MetricsOptions{
674674
MeterProvider: provider,
675675
Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"),
676-
OptionalLabels: []string{"grpc.lb.locality"},
676+
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
677677
}
678678

679679
target := fmt.Sprintf("xds:///%s", serviceName)
@@ -699,6 +699,7 @@ func (s) TestWRRMetrics(t *testing.T) {
699699

700700
targetAttr := attribute.String("grpc.target", target)
701701
localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`)
702+
backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName)
702703

703704
wantMetrics := []metricdata.Metrics{
704705
{
@@ -708,7 +709,7 @@ func (s) TestWRRMetrics(t *testing.T) {
708709
Data: metricdata.Sum[int64]{
709710
DataPoints: []metricdata.DataPoint[int64]{
710711
{
711-
Attributes: attribute.NewSet(targetAttr, localityAttr),
712+
Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
712713
Value: 1, // value ignored
713714
},
714715
},
@@ -724,7 +725,7 @@ func (s) TestWRRMetrics(t *testing.T) {
724725
Data: metricdata.Sum[int64]{
725726
DataPoints: []metricdata.DataPoint[int64]{
726727
{
727-
Attributes: attribute.NewSet(targetAttr, localityAttr),
728+
Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
728729
Value: 1, // value ignored
729730
},
730731
},
@@ -739,7 +740,7 @@ func (s) TestWRRMetrics(t *testing.T) {
739740
Data: metricdata.Histogram[float64]{
740741
DataPoints: []metricdata.HistogramDataPoint[float64]{
741742
{
742-
Attributes: attribute.NewSet(targetAttr, localityAttr),
743+
Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
743744
},
744745
},
745746
Temporality: metricdata.CumulativeTemporality,
@@ -761,7 +762,7 @@ func (s) TestWRRMetrics(t *testing.T) {
761762
Data: metricdata.Sum[int64]{
762763
DataPoints: []metricdata.DataPoint[int64]{
763764
{
764-
Attributes: attribute.NewSet(targetAttr, localityAttr),
765+
Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr),
765766
Value: 1, // value ignored
766767
},
767768
},

0 commit comments

Comments
 (0)