Skip to content

Commit 1737078

Browse files
authored
feat(translator): add store metric (#6406)
* add store metric Signed-off-by: Guy Daich <guy.daich@sap.com> * add naming helper, use runner.name, add test Signed-off-by: Guy Daich <guy.daich@sap.com>
1 parent b8ca958 commit 1737078

File tree

13 files changed

+210
-43
lines changed

13 files changed

+210
-43
lines changed

internal/gatewayapi/runner/runner.go

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (r *Runner) startWasmCache(ctx context.Context) {
124124
}
125125

126126
func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResources]) {
127-
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGatewayAPIRunner), Message: "provider-resources"}, sub,
127+
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub,
128128
func(update message.Update[string, *resource.ControllerResources], errChan chan error) {
129129
r.Logger.Info("received an update")
130130
val := update.Value
@@ -187,55 +187,87 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
187187
// Publish the IRs.
188188
// Also validate the ir before sending it.
189189
for key, val := range result.InfraIR {
190-
r.Logger.V(1).WithValues("infra-ir", key).Info(val.JSONString())
190+
r.Logger.V(1).WithValues(string(message.InfraIRMessageName), key).Info(val.JSONString())
191191
if err := val.Validate(); err != nil {
192192
r.Logger.Error(err, "unable to validate infra ir, skipped sending it")
193193
errChan <- err
194194
} else {
195-
r.InfraIR.Store(key, val)
195+
message.HandleStore(message.Metadata{
196+
Runner: r.Name(),
197+
Message: message.InfraIRMessageName,
198+
},
199+
key, val, &r.InfraIR.Map)
196200
newIRKeys = append(newIRKeys, key)
197201
}
198202
}
199203

200204
for key, val := range result.XdsIR {
201-
r.Logger.V(1).WithValues("xds-ir", key).Info(val.JSONString())
205+
r.Logger.V(1).WithValues(string(message.XDSIRMessageName), key).Info(val.JSONString())
202206
if err := val.Validate(); err != nil {
203207
r.Logger.Error(err, "unable to validate xds ir, skipped sending it")
204208
errChan <- err
205209
} else {
206-
r.XdsIR.Store(key, val)
210+
message.HandleStore(message.Metadata{
211+
Runner: r.Name(),
212+
Message: message.XDSIRMessageName,
213+
},
214+
key, val, &r.XdsIR.Map)
207215
}
208216
}
209217

210218
// Update Status
211219
for _, gateway := range result.Gateways {
212220
key := utils.NamespacedName(gateway)
213-
r.ProviderResources.GatewayStatuses.Store(key, &gateway.Status)
221+
message.HandleStore(message.Metadata{
222+
Runner: r.Name(),
223+
Message: message.GatewayStatusMessageName,
224+
},
225+
key, &gateway.Status, &r.ProviderResources.GatewayStatuses)
214226
delete(statusesToDelete.GatewayStatusKeys, key)
215227
}
216228
for _, httpRoute := range result.HTTPRoutes {
217229
key := utils.NamespacedName(httpRoute)
218-
r.ProviderResources.HTTPRouteStatuses.Store(key, &httpRoute.Status)
230+
message.HandleStore(message.Metadata{
231+
Runner: r.Name(),
232+
Message: message.HTTPRouteStatusMessageName,
233+
},
234+
key, &httpRoute.Status, &r.ProviderResources.HTTPRouteStatuses)
219235
delete(statusesToDelete.HTTPRouteStatusKeys, key)
220236
}
221237
for _, grpcRoute := range result.GRPCRoutes {
222238
key := utils.NamespacedName(grpcRoute)
223-
r.ProviderResources.GRPCRouteStatuses.Store(key, &grpcRoute.Status)
239+
message.HandleStore(message.Metadata{
240+
Runner: r.Name(),
241+
Message: message.GRPCRouteStatusMessageName,
242+
},
243+
key, &grpcRoute.Status, &r.ProviderResources.GRPCRouteStatuses)
224244
delete(statusesToDelete.GRPCRouteStatusKeys, key)
225245
}
226246
for _, tlsRoute := range result.TLSRoutes {
227247
key := utils.NamespacedName(tlsRoute)
228-
r.ProviderResources.TLSRouteStatuses.Store(key, &tlsRoute.Status)
248+
message.HandleStore(message.Metadata{
249+
Runner: r.Name(),
250+
Message: message.TLSRouteStatusMessageName,
251+
},
252+
key, &tlsRoute.Status, &r.ProviderResources.TLSRouteStatuses)
229253
delete(statusesToDelete.TLSRouteStatusKeys, key)
230254
}
231255
for _, tcpRoute := range result.TCPRoutes {
232256
key := utils.NamespacedName(tcpRoute)
233-
r.ProviderResources.TCPRouteStatuses.Store(key, &tcpRoute.Status)
257+
message.HandleStore(message.Metadata{
258+
Runner: r.Name(),
259+
Message: message.TCPRouteStatusMessageName,
260+
},
261+
key, &tcpRoute.Status, &r.ProviderResources.TCPRouteStatuses)
234262
delete(statusesToDelete.TCPRouteStatusKeys, key)
235263
}
236264
for _, udpRoute := range result.UDPRoutes {
237265
key := utils.NamespacedName(udpRoute)
238-
r.ProviderResources.UDPRouteStatuses.Store(key, &udpRoute.Status)
266+
message.HandleStore(message.Metadata{
267+
Runner: r.Name(),
268+
Message: message.UDPRouteStatusMessageName,
269+
},
270+
key, &udpRoute.Status, &r.ProviderResources.UDPRouteStatuses)
239271
delete(statusesToDelete.UDPRouteStatusKeys, key)
240272
}
241273

@@ -246,43 +278,67 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
246278
for _, backendTLSPolicy := range result.BackendTLSPolicies {
247279
key := utils.NamespacedName(backendTLSPolicy)
248280
if !(reflect.ValueOf(backendTLSPolicy.Status).IsZero()) {
249-
r.ProviderResources.BackendTLSPolicyStatuses.Store(key, &backendTLSPolicy.Status)
281+
message.HandleStore(message.Metadata{
282+
Runner: r.Name(),
283+
Message: message.BackendTLSPolicyStatusMessageName,
284+
},
285+
key, &backendTLSPolicy.Status, &r.ProviderResources.BackendTLSPolicyStatuses)
250286
}
251287
delete(statusesToDelete.BackendTLSPolicyStatusKeys, key)
252288
}
253289

254290
for _, clientTrafficPolicy := range result.ClientTrafficPolicies {
255291
key := utils.NamespacedName(clientTrafficPolicy)
256292
if !(reflect.ValueOf(clientTrafficPolicy.Status).IsZero()) {
257-
r.ProviderResources.ClientTrafficPolicyStatuses.Store(key, &clientTrafficPolicy.Status)
293+
message.HandleStore(message.Metadata{
294+
Runner: r.Name(),
295+
Message: message.ClientTrafficPolicyStatusMessageName,
296+
},
297+
key, &clientTrafficPolicy.Status, &r.ProviderResources.ClientTrafficPolicyStatuses)
258298
}
259299
delete(statusesToDelete.ClientTrafficPolicyStatusKeys, key)
260300
}
261301
for _, backendTrafficPolicy := range result.BackendTrafficPolicies {
262302
key := utils.NamespacedName(backendTrafficPolicy)
263303
if !(reflect.ValueOf(backendTrafficPolicy.Status).IsZero()) {
264-
r.ProviderResources.BackendTrafficPolicyStatuses.Store(key, &backendTrafficPolicy.Status)
304+
message.HandleStore(message.Metadata{
305+
Runner: r.Name(),
306+
Message: message.BackendTrafficPolicyStatusMessageName,
307+
},
308+
key, &backendTrafficPolicy.Status, &r.ProviderResources.BackendTrafficPolicyStatuses)
265309
}
266310
delete(statusesToDelete.BackendTrafficPolicyStatusKeys, key)
267311
}
268312
for _, securityPolicy := range result.SecurityPolicies {
269313
key := utils.NamespacedName(securityPolicy)
270314
if !(reflect.ValueOf(securityPolicy.Status).IsZero()) {
271-
r.ProviderResources.SecurityPolicyStatuses.Store(key, &securityPolicy.Status)
315+
message.HandleStore(message.Metadata{
316+
Runner: r.Name(),
317+
Message: message.SecurityPolicyStatusMessageName,
318+
},
319+
key, &securityPolicy.Status, &r.ProviderResources.SecurityPolicyStatuses)
272320
}
273321
delete(statusesToDelete.SecurityPolicyStatusKeys, key)
274322
}
275323
for _, envoyExtensionPolicy := range result.EnvoyExtensionPolicies {
276324
key := utils.NamespacedName(envoyExtensionPolicy)
277325
if !(reflect.ValueOf(envoyExtensionPolicy.Status).IsZero()) {
278-
r.ProviderResources.EnvoyExtensionPolicyStatuses.Store(key, &envoyExtensionPolicy.Status)
326+
message.HandleStore(message.Metadata{
327+
Runner: r.Name(),
328+
Message: message.EnvoyExtensionPolicyStatusMessageName,
329+
},
330+
key, &envoyExtensionPolicy.Status, &r.ProviderResources.EnvoyExtensionPolicyStatuses)
279331
}
280332
delete(statusesToDelete.EnvoyExtensionPolicyStatusKeys, key)
281333
}
282334
for _, backend := range result.Backends {
283335
key := utils.NamespacedName(backend)
284336
if !(reflect.ValueOf(backend.Status).IsZero()) {
285-
r.ProviderResources.BackendStatuses.Store(key, &backend.Status)
337+
message.HandleStore(message.Metadata{
338+
Runner: r.Name(),
339+
Message: message.BackendStatusMessageName,
340+
},
341+
key, &backend.Status, &r.ProviderResources.BackendStatuses)
286342
}
287343
delete(statusesToDelete.BackendStatusKeys, key)
288344
}
@@ -293,7 +349,11 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re
293349
}
294350
if !(reflect.ValueOf(extServerPolicy.Object["status"]).IsZero()) {
295351
policyStatus := unstructuredToPolicyStatus(extServerPolicy.Object["status"].(map[string]any))
296-
r.ProviderResources.ExtensionPolicyStatuses.Store(key, &policyStatus)
352+
message.HandleStore(message.Metadata{
353+
Runner: r.Name(),
354+
Message: message.ExtensionServerPoliciesStatusMessageName,
355+
},
356+
key, &policyStatus, &r.ProviderResources.ExtensionPolicyStatuses)
297357
}
298358
delete(statusesToDelete.ExtensionServerPolicyStatusKeys, key)
299359
}

internal/globalratelimit/runner/runner.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
137137
rateLimitConfigsCache := map[string][]cachetype.Resource{}
138138

139139
// Subscribe to resources.
140-
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentGlobalRateLimitRunner), Message: "xds-ir"}, r.XdsIR.Subscribe(ctx),
140+
141+
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, r.XdsIR.Subscribe(ctx),
141142
func(update message.Update[string, *ir.Xds], errChan chan error) {
142143
r.Logger.Info("received a notification")
143144

internal/infrastructure/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (r *Runner) Start(ctx context.Context) (err error) {
9595

9696
func (r *Runner) subscribeToProxyInfraIR(ctx context.Context, sub <-chan watchable.Snapshot[string, *ir.Infra]) {
9797
// Subscribe to resources
98-
message.HandleSubscription(message.Metadata{Runner: string(egv1a1.LogComponentInfrastructureRunner), Message: "infra-ir"}, sub,
98+
message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.InfraIRMessageName}, sub,
9999
func(update message.Update[string, *ir.Infra], errChan chan error) {
100100
r.Logger.Info("received an update")
101101
val := update.Value

internal/message/metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ var (
2929
"Total number of subscribed watchable queue.",
3030
)
3131

32+
watchablePublishTotal = metrics.NewCounter(
33+
"watchable_publish_total",
34+
"Total number of published updates to watchable queue.",
35+
)
36+
3237
runnerLabel = metrics.NewLabel("runner")
3338
messageLabel = metrics.NewLabel("message")
3439
)

internal/message/types.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,46 @@ type InfraIR struct {
137137
type Xds struct {
138138
watchable.Map[string, *xdstypes.ResourceVersionTable]
139139
}
140+
141+
type MessageName string
142+
143+
const (
144+
// XDSMessageName is a message containing xds translated from xds-ir
145+
XDSMessageName MessageName = "xds"
146+
// XDSIRMessageName is a message containing xds-ir translated from provider-resources
147+
XDSIRMessageName MessageName = "xds-ir"
148+
// InfraIRMessageName is a message containing infra-ir translated from provider-resources
149+
InfraIRMessageName MessageName = "infra-ir"
150+
// ProviderResourcesMessageName is a message containing gw-api and envoy gateway resources from the provider
151+
ProviderResourcesMessageName MessageName = "provider-resources"
152+
// BackendStatusMessageName is a message containing updates to Backend status
153+
BackendStatusMessageName MessageName = "backend-status"
154+
// ExtensionServerPoliciesStatusMessageName is a message containing updates to ExtensionServerPolicy status
155+
ExtensionServerPoliciesStatusMessageName MessageName = "extensionserverpolicies-status"
156+
// EnvoyExtensionPolicyStatusMessageName is a message containing updates to EnvoyExtensionPolicy status
157+
EnvoyExtensionPolicyStatusMessageName MessageName = "envoyextensionpolicy-status"
158+
// EnvoyPatchPolicyStatusMessageName is a message containing updates to EnvoyPatchPolicy status
159+
EnvoyPatchPolicyStatusMessageName MessageName = "envoypatchpolicy-status"
160+
// SecurityPolicyStatusMessageName is a message containing updates to SecurityPolicy status
161+
SecurityPolicyStatusMessageName MessageName = "securitypolicy-status"
162+
// BackendTrafficPolicyStatusMessageName is a message containing updates to BackendTrafficPolicy status
163+
BackendTrafficPolicyStatusMessageName MessageName = "backendtrafficpolicy-status"
164+
// ClientTrafficPolicyStatusMessageName is a message containing updates to ClientTrafficPolicy status
165+
ClientTrafficPolicyStatusMessageName MessageName = "clienttrafficpolicy-status"
166+
// BackendTLSPolicyStatusMessageName is a message containing updates to BackendTLSPolicy status
167+
BackendTLSPolicyStatusMessageName MessageName = "backendtlspolicy-status"
168+
// UDPRouteStatusMessageName is a message containing updates to UDPRoute status
169+
UDPRouteStatusMessageName MessageName = "udproute-status"
170+
// TCPRouteStatusMessageName is a message containing updates to TCPRoute status
171+
TCPRouteStatusMessageName MessageName = "tcproute-status"
172+
// TLSRouteStatusMessageName is a message containing updates to TLSRoute status
173+
TLSRouteStatusMessageName MessageName = "tlsroute-status"
174+
// GRPCRouteStatusMessageName is a message containing updates to GRPCRoute status
175+
GRPCRouteStatusMessageName MessageName = "grpcroute-status"
176+
// HTTPRouteStatusMessageName is a message containing updates to HTTPRoute status
177+
HTTPRouteStatusMessageName MessageName = "httproute-status"
178+
// GatewayStatusMessageName is a message containing updates to Gateway status
179+
GatewayStatusMessageName MessageName = "gateway-status"
180+
// GatewayClassStatusMessageName is a message containing updates to GatewayClass status
181+
GatewayClassStatusMessageName MessageName = "gatewayclass-status"
182+
)

internal/message/watchutil.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ var logger = logging.DefaultLogger(os.Stdout, egv1a1.LogLevelInfo).WithName("wat
2525

2626
type Metadata struct {
2727
Runner string
28-
Message string
28+
Message MessageName
2929
}
3030

3131
func (m Metadata) LabelValues() []metrics.LabelValue {
@@ -34,7 +34,7 @@ func (m Metadata) LabelValues() []metrics.LabelValue {
3434
labels = append(labels, runnerLabel.Value(m.Runner))
3535
}
3636
if m.Message != "" {
37-
labels = append(labels, messageLabel.Value(m.Message))
37+
labels = append(labels, messageLabel.Value(string(m.Message)))
3838
}
3939

4040
return labels
@@ -100,3 +100,8 @@ func HandleSubscription[K comparable, V any](
100100
}
101101
}
102102
}
103+
104+
func HandleStore[K comparable, V any](meta Metadata, key K, value V, publish *watchable.Map[K, V]) {
105+
publish.Store(key, value)
106+
watchablePublishTotal.WithSuccess(meta.LabelValues()...).Increment()
107+
}

internal/message/watchutil_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,39 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
9191
assert.Equal(t, 1, deleteCalls)
9292
}
9393

94+
func TestHandleStore(t *testing.T) {
95+
var m watchable.Map[string, any]
96+
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m)
97+
98+
endCtx, end := context.WithCancel(context.Background())
99+
go func() {
100+
<-endCtx.Done()
101+
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "baz", "qux", &m)
102+
m.Delete("qux") // no-op
103+
message.HandleStore(message.Metadata{Runner: "demo", Message: "demo"}, "foo", "bar", &m) // no-op
104+
m.Delete("baz")
105+
time.Sleep(100 * time.Millisecond)
106+
m.Close()
107+
}()
108+
109+
var storeCalls int
110+
var deleteCalls int
111+
message.HandleSubscription[string, any](
112+
message.Metadata{Runner: "demo", Message: "demo"},
113+
m.Subscribe(context.Background()),
114+
func(update message.Update[string, any], errChans chan error) {
115+
end()
116+
if update.Delete {
117+
deleteCalls++
118+
} else {
119+
storeCalls++
120+
}
121+
},
122+
)
123+
assert.Equal(t, 2, storeCalls)
124+
assert.Equal(t, 1, deleteCalls)
125+
}
126+
94127
func TestXdsIRUpdates(t *testing.T) {
95128
tests := []struct {
96129
desc string

internal/provider/kubernetes/controller.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
252252
false,
253253
string(gwapiv1.GatewayClassReasonInvalidParameters),
254254
msg)
255-
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)
255+
message.HandleStore(message.Metadata{
256+
Runner: string(egv1a1.LogComponentProviderRunner),
257+
Message: message.GatewayClassStatusMessageName,
258+
},
259+
utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses)
256260
failToProcessGCParamsRef = true
257261
}
258262
}
@@ -270,7 +274,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
270274
false,
271275
string(gwapiv1.GatewayClassReasonAccepted),
272276
fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err))
273-
r.resources.GatewayClassStatuses.Store(utils.NamespacedName(gc), &gc.Status)
277+
message.HandleStore(message.Metadata{
278+
Runner: string(egv1a1.LogComponentProviderRunner),
279+
Message: message.GatewayClassStatusMessageName,
280+
},
281+
utils.NamespacedName(gc), &gc.Status, &r.resources.GatewayClassStatuses)
274282
failToProcessGCParamsRef = true
275283
}
276284

@@ -466,7 +474,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques
466474
// The Store is triggered even when there are no Gateways associated to the
467475
// GatewayClass. This would happen in case the last Gateway is removed and the
468476
// Store will be required to trigger a cleanup of envoy infra resources.
469-
r.resources.GatewayAPIResources.Store(string(r.classController), &gwcResources)
477+
message.HandleStore(message.Metadata{
478+
Runner: string(egv1a1.LogComponentProviderRunner),
479+
Message: message.ProviderResourcesMessageName,
480+
},
481+
string(r.classController), &gwcResources, &r.resources.GatewayAPIResources)
470482

471483
r.log.Info("reconciled gateways successfully")
472484
return reconcile.Result{}, nil

0 commit comments

Comments
 (0)