Skip to content

Commit b09e513

Browse files
domsolutionslc525
andauthored
fix(operator): Blocking gRPC calls (#6898)
* fix: blocked on ServerNotify when expoentially backing off retrying * licence * fix hidden log * fix missing logs * log to make clear we've connected and not prevented by network issues * Update operator/controllers/mlops/seldonruntime_controller.go Co-authored-by: Lucian Carata <lc525@users.noreply.github.com> * PR comments * fix ctx cancelling prematurely --------- Co-authored-by: Lucian Carata <lc525@users.noreply.github.com>
1 parent 9c4cd51 commit b09e513

40 files changed

+466
-299
lines changed

operator/apis/mlops/v1alpha1/seldonconfig_types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ func GetSeldonConfigForSeldonRuntime(seldonConfigName string, client client.Clie
159159
return nil, fmt.Errorf("SeldonConfig not specified and is required")
160160
}
161161
sc := SeldonConfig{}
162-
err := client.Get(context.TODO(), types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc)
162+
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout)
163+
defer cancel()
164+
err := client.Get(ctx, types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc)
163165
return &sc, err
164166
}
165167

operator/apis/mlops/v1alpha1/serverconfig_types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ func GetServerConfigForServer(serverConfig string, client client.Client) (*Serve
7474
return nil, fmt.Errorf("ServerType not specified and is required")
7575
}
7676
sc := ServerConfig{}
77-
err := client.Get(context.TODO(), types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc)
77+
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout)
78+
defer cancel()
79+
err := client.Get(ctx, types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc)
7880
if err != nil {
7981
return nil, fmt.Errorf("failed to get ServerConfig: %w", err)
8082
}

operator/controllers/mlops/experiment_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package mlops
1111

1212
import (
1313
"context"
14+
"time"
1415

1516
"github.com/go-logr/logr"
1617
"k8s.io/apimachinery/pkg/api/errors"
@@ -82,10 +83,15 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
8283
// For more details, check Reconcile and its Result here:
8384
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
8485
func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
85-
logger := log.FromContext(ctx).WithName("Reconcile")
86+
logger := log.FromContext(ctx).WithName("ExperimentReconcile")
8687
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
8788
defer cancel()
8889

90+
now := time.Now()
91+
defer func() {
92+
logger.Info("Finished Experiment Reconcile", "duration", time.Since(now))
93+
}()
94+
8995
experiment := &mlopsv1alpha1.Experiment{}
9096
if err := r.Get(ctx, req.NamespacedName, experiment); err != nil {
9197
if errors.IsNotFound(err) {

operator/controllers/mlops/model_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package mlops
1111

1212
import (
1313
"context"
14+
"time"
1415

1516
"github.com/go-logr/logr"
1617
"k8s.io/apimachinery/pkg/api/errors"
@@ -76,10 +77,15 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge
7677
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
7778

7879
func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
79-
logger := log.FromContext(ctx).WithName("Reconcile")
80+
logger := log.FromContext(ctx).WithName("ModelReconcile")
8081
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
8182
defer cancel()
8283

84+
now := time.Now()
85+
defer func() {
86+
logger.Info("Finished Model Reconcile", "duration", time.Since(now))
87+
}()
88+
8389
model := &mlopsv1alpha1.Model{}
8490
if err := r.Get(ctx, req.NamespacedName, model); err != nil {
8591
if errors.IsNotFound(err) {

operator/controllers/mlops/pipeline_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package mlops
1111

1212
import (
1313
"context"
14+
"time"
1415

1516
"github.com/go-logr/logr"
1617
"k8s.io/apimachinery/pkg/api/errors"
@@ -92,10 +93,15 @@ func (r *PipelineReconciler) handleFinalizer(
9293
// For more details, check Reconcile and its Result here:
9394
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
9495
func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
95-
logger := log.FromContext(ctx).WithName("Reconcile")
96+
logger := log.FromContext(ctx).WithName("PipelineReconcile")
9697
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
9798
defer cancel()
9899

100+
now := time.Now()
101+
defer func() {
102+
logger.Info("Finished Pipeline Reconcile", "duration", time.Since(now))
103+
}()
104+
99105
pipeline := &mlopsv1alpha1.Pipeline{}
100106
if err := r.Get(ctx, req.NamespacedName, pipeline); err != nil {
101107
if errors.IsNotFound(err) {

operator/controllers/mlops/seldonruntime_controller.go

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package mlops
1212
import (
1313
"context"
1414
"fmt"
15+
"time"
1516

1617
"github.com/go-logr/logr"
1718
appsv1 "k8s.io/api/apps/v1"
@@ -45,10 +46,12 @@ type SeldonRuntimeReconciler struct {
4546
Recorder record.EventRecorder
4647
}
4748

48-
func (r *SeldonRuntimeReconciler) getNumberOfServers(namespace string) (int, error) {
49+
func (r *SeldonRuntimeReconciler) getNumberOfServers(ctx context.Context, namespace string) (int, error) {
4950
servers := mlopsv1alpha1.ServerList{}
51+
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
52+
defer cancel()
5053
inNamespace := client.InNamespace(namespace)
51-
err := r.List(context.TODO(), &servers, inNamespace)
54+
err := r.List(ctx, &servers, inNamespace)
5255
if err != nil {
5356
if errors.IsNotFound(err) {
5457
return 0, nil
@@ -69,7 +72,7 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
6972
}
7073
}
7174
} else { // runtime is being deleted
72-
numServers, err := r.getNumberOfServers(runtime.Namespace)
75+
numServers, err := r.getNumberOfServers(ctx, runtime.Namespace)
7376
logger.Info("Runtime being deleted", "namespace", runtime.Namespace, "numServers", numServers)
7477
if err != nil {
7578
return true, err
@@ -119,10 +122,15 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
119122
// For more details, check Reconcile and its Result here:
120123
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.4/pkg/reconcile
121124
func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
122-
logger := log.FromContext(ctx).WithName("Reconcile")
125+
logger := log.FromContext(ctx).WithName("SeldonRuntimeReconcile")
123126
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
124127
defer cancel()
125128

129+
now := time.Now()
130+
defer func() {
131+
logger.Info("Finished SeldonRuntime Reconcile", "duration", time.Since(now))
132+
}()
133+
126134
seldonRuntime := &mlopsv1alpha1.SeldonRuntime{}
127135
if err := r.Get(ctx, req.NamespacedName, seldonRuntime); err != nil {
128136
if errors.IsNotFound(err) {
@@ -141,9 +149,9 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
141149
}
142150

143151
sr, err := seldonreconcile.NewSeldonRuntimeReconciler(
152+
ctx,
144153
seldonRuntime,
145154
common.ReconcilerConfig{
146-
Ctx: ctx,
147155
Logger: logger,
148156
Client: r.Client,
149157
Recorder: r.Recorder,
@@ -160,7 +168,7 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
160168
return reconcile.Result{}, err
161169
}
162170

163-
err = sr.Reconcile()
171+
err = sr.Reconcile(ctx)
164172
if err != nil {
165173
return reconcile.Result{}, err
166174
}
@@ -187,42 +195,47 @@ func seldonRuntimeReady(status mlopsv1alpha1.SeldonRuntimeStatus) bool {
187195
func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.SeldonRuntime, logger logr.Logger) error {
188196
existingRuntime := &mlopsv1alpha1.SeldonRuntime{}
189197
namespacedName := types.NamespacedName{Name: seldonRuntime.Name, Namespace: seldonRuntime.Namespace}
190-
if err := r.Get(context.TODO(), namespacedName, existingRuntime); err != nil {
198+
199+
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
200+
defer cancel()
201+
if err := r.Get(ctx, namespacedName, existingRuntime); err != nil {
191202
if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors
192203
return nil
193204
}
194205
return err
195206
}
196207

197208
if equality.Semantic.DeepEqual(existingRuntime.Status, seldonRuntime.Status) {
209+
return nil
198210
// Not updating as no difference
211+
}
212+
213+
if err := r.Status().Update(ctx, seldonRuntime); err != nil {
214+
logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
215+
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed",
216+
"Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err)
217+
return err
199218
} else {
200-
if err := r.Status().Update(context.TODO(), seldonRuntime); err != nil {
201-
logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
202-
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed",
203-
"Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err)
204-
return err
205-
} else {
206-
logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
207-
prevWasReady := seldonRuntimeReady(existingRuntime.Status)
208-
currentIsReady := seldonRuntimeReady(seldonRuntime.Status)
209-
if prevWasReady && !currentIsReady {
210-
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady",
211-
fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName()))
212-
} else if !prevWasReady && currentIsReady {
213-
r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady",
214-
fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName()))
215-
}
219+
logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
220+
prevWasReady := seldonRuntimeReady(existingRuntime.Status)
221+
currentIsReady := seldonRuntimeReady(seldonRuntime.Status)
222+
if prevWasReady && !currentIsReady {
223+
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady",
224+
fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName()))
225+
} else if !prevWasReady && currentIsReady {
226+
r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady",
227+
fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName()))
216228
}
217229
}
230+
218231
return nil
219232
}
220233

221234
// Find SeldonRuntimes that reference the changes SeldonConfig
222-
// TODO: pass an actual context from the caller to be used here
223-
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(_ context.Context, obj client.Object) []reconcile.Request {
224-
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
235+
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(ctx context.Context, obj client.Object) []reconcile.Request {
236+
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
225237
defer cancel()
238+
226239
logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromSeldonConfig")
227240
var seldonRuntimes mlopsv1alpha1.SeldonRuntimeList
228241
if err := r.Client.List(ctx, &seldonRuntimes); err != nil {
@@ -263,9 +276,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesByNamespace(ctx context.Conte
263276
return req
264277
}
265278

266-
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Context, obj client.Object) []reconcile.Request {
267-
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
279+
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(ctx context.Context, obj client.Object) []reconcile.Request {
280+
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
268281
defer cancel()
282+
269283
logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromPipeline")
270284
pipeline, ok := obj.(*mlopsv1alpha1.Pipeline)
271285
if !ok {
@@ -275,9 +289,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Contex
275289
return r.mapSeldonRuntimesByNamespace(ctx, pipeline.Namespace, logger)
276290
}
277291

278-
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(_ context.Context, obj client.Object) []reconcile.Request {
279-
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
292+
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(ctx context.Context, obj client.Object) []reconcile.Request {
293+
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
280294
defer cancel()
295+
281296
logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromModel")
282297
model, ok := obj.(*mlopsv1alpha1.Model)
283298
if !ok {

operator/controllers/mlops/server_controller.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package mlops
1212
import (
1313
"context"
1414
"fmt"
15+
"time"
1516

1617
"github.com/go-logr/logr"
1718
appsv1 "k8s.io/api/apps/v1"
@@ -68,10 +69,15 @@ type ServerReconciler struct {
6869
// For more details, check Reconcile and its Result here:
6970
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
7071
func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
71-
logger := log.FromContext(ctx).WithName("Reconcile")
72+
logger := log.FromContext(ctx).WithName("ServerReconcile")
7273
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
7374
defer cancel()
7475

76+
now := time.Now()
77+
defer func() {
78+
logger.Info("Finished Server Reconcile", "duration", time.Since(now))
79+
}()
80+
7581
logger.Info("Received reconcile for Server", "name", req.Name, "namespace", req.NamespacedName.Namespace)
7682

7783
server := &mlopsv1alpha1.Server{}
@@ -110,13 +116,11 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
110116
var sr common.Reconciler
111117
if r.UseDeploymentsForServers {
112118
sr, err = serverreconcile.NewServerReconcilerWithDeployment(server, common.ReconcilerConfig{
113-
Ctx: ctx,
114119
Logger: logger,
115120
Client: r.Client,
116121
})
117122
} else {
118123
sr, err = serverreconcile.NewServerReconciler(server, common.ReconcilerConfig{
119-
Ctx: ctx,
120124
Logger: logger,
121125
Client: r.Client,
122126
})
@@ -136,7 +140,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
136140
}
137141

138142
// attempt to deploy
139-
err = sr.Reconcile()
143+
err = sr.Reconcile(ctx)
140144
if err != nil {
141145
logger.Error(err, "Failed reconciling", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
142146
r.updateStatusFromError(ctx, logger, server, err)
@@ -162,7 +166,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
162166
server.Status.Selector = selector
163167
server.Status.Replicas = *server.Spec.Replicas
164168

165-
err = r.updateStatus(server)
169+
err = r.updateStatus(ctx, server)
166170
if err != nil {
167171
logger.Error(err, "Failed updating status", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
168172
return reconcile.Result{}, err
@@ -184,10 +188,10 @@ func (r *ServerReconciler) updateStatusFromError(ctx context.Context, logger log
184188
}
185189
}
186190

187-
func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
191+
func (r *ServerReconciler) updateStatus(ctx context.Context, server *mlopsv1alpha1.Server) error {
188192
existingServer := &mlopsv1alpha1.Server{}
189193
namespacedName := types.NamespacedName{Name: server.Name, Namespace: server.Namespace}
190-
if err := r.Get(context.TODO(), namespacedName, existingServer); err != nil {
194+
if err := r.Get(ctx, namespacedName, existingServer); err != nil {
191195
if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors
192196
return nil
193197
}
@@ -197,7 +201,7 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
197201
if equality.Semantic.DeepEqual(existingServer.Status, server.Status) {
198202
// Not updating as no difference
199203
} else {
200-
if err := r.Status().Update(context.TODO(), server); err != nil {
204+
if err := r.Status().Update(ctx, server); err != nil {
201205
r.Recorder.Eventf(server, v1.EventTypeWarning, "UpdateFailed",
202206
"Failed to update status for Model %q: %v", server.Name, err)
203207
return err
@@ -217,10 +221,10 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
217221
}
218222

219223
// Find Servers that need reconcilliation from a change to a given ServerConfig
220-
// TODO: pass an actual context from the caller to be used here
221-
func (r *ServerReconciler) mapServerFromServerConfig(_ context.Context, obj client.Object) []reconcile.Request {
222-
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
224+
func (r *ServerReconciler) mapServerFromServerConfig(ctx context.Context, obj client.Object) []reconcile.Request {
225+
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
223226
defer cancel()
227+
224228
logger := log.FromContext(ctx).WithName("mapServerFromServerConfig")
225229
var servers mlopsv1alpha1.ServerList
226230
if err := r.Client.List(ctx, &servers); err != nil {

operator/controllers/mlops/serverconfig_controller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ type ServerConfigReconciler struct {
3939
// For more details, check Reconcile and its Result here:
4040
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
4141
func (r *ServerConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
42-
4342
return ctrl.Result{}, nil
4443
}
4544

operator/controllers/reconcilers/common/reconciler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
)
2121

2222
type Reconciler interface {
23-
Reconcile() error
23+
Reconcile(ctx context.Context) error
2424
GetResources() []client.Object
2525
GetConditions() []*apis.Condition
2626
}
@@ -42,15 +42,14 @@ func CopyMap[K, V comparable](m map[K]V) map[K]V {
4242
}
4343

4444
type ReplicaHandler interface {
45-
GetReplicas() (int32, error)
45+
GetReplicas(ctx context.Context) (int32, error)
4646
}
4747

4848
type LabelHandler interface {
4949
GetLabelSelector() string
5050
}
5151

5252
type ReconcilerConfig struct {
53-
Ctx context.Context
5453
Client client.Client
5554
Recorder record.EventRecorder
5655
Logger logr.Logger

0 commit comments

Comments
 (0)