diff --git a/operator/apis/mlops/v1alpha1/seldonconfig_types.go b/operator/apis/mlops/v1alpha1/seldonconfig_types.go index 198eed5166..e46050f0a5 100644 --- a/operator/apis/mlops/v1alpha1/seldonconfig_types.go +++ b/operator/apis/mlops/v1alpha1/seldonconfig_types.go @@ -159,7 +159,9 @@ func GetSeldonConfigForSeldonRuntime(seldonConfigName string, client client.Clie return nil, fmt.Errorf("SeldonConfig not specified and is required") } sc := SeldonConfig{} - err := client.Get(context.TODO(), types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc) + ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout) + defer cancel() + err := client.Get(ctx, types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc) return &sc, err } diff --git a/operator/apis/mlops/v1alpha1/serverconfig_types.go b/operator/apis/mlops/v1alpha1/serverconfig_types.go index 5a8c985eb6..f8a009d2b7 100644 --- a/operator/apis/mlops/v1alpha1/serverconfig_types.go +++ b/operator/apis/mlops/v1alpha1/serverconfig_types.go @@ -74,7 +74,9 @@ func GetServerConfigForServer(serverConfig string, client client.Client) (*Serve return nil, fmt.Errorf("ServerType not specified and is required") } sc := ServerConfig{} - err := client.Get(context.TODO(), types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc) + ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout) + defer cancel() + err := client.Get(ctx, types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc) if err != nil { return nil, fmt.Errorf("failed to get ServerConfig: %w", err) } diff --git a/operator/controllers/mlops/experiment_controller.go b/operator/controllers/mlops/experiment_controller.go index 49a2d09151..6dc7545a9e 100644 --- a/operator/controllers/mlops/experiment_controller.go +++ b/operator/controllers/mlops/experiment_controller.go @@ -11,6 +11,7 @@ package mlops import ( "context" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" @@ -82,10 +83,15 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr. // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithName("Reconcile") + logger := log.FromContext(ctx).WithName("ExperimentReconcile") ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) defer cancel() + now := time.Now() + defer func() { + logger.Info("Finished Experiment Reconcile", "duration", time.Since(now)) + }() + experiment := &mlopsv1alpha1.Experiment{} if err := r.Get(ctx, req.NamespacedName, experiment); err != nil { if errors.IsNotFound(err) { diff --git a/operator/controllers/mlops/model_controller.go b/operator/controllers/mlops/model_controller.go index 4d0fed2a29..4a7d25ce2c 100644 --- a/operator/controllers/mlops/model_controller.go +++ b/operator/controllers/mlops/model_controller.go @@ -11,6 +11,7 @@ package mlops import ( "context" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" @@ -76,10 +77,15 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge //+kubebuilder:rbac:groups="",resources=events,verbs=create;patch func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithName("Reconcile") + logger := log.FromContext(ctx).WithName("ModelReconcile") ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) defer cancel() + now := time.Now() + defer func() { + logger.Info("Finished Model Reconcile", "duration", time.Since(now)) + }() + model := &mlopsv1alpha1.Model{} if err := r.Get(ctx, req.NamespacedName, model); err != nil { if errors.IsNotFound(err) { diff --git a/operator/controllers/mlops/pipeline_controller.go b/operator/controllers/mlops/pipeline_controller.go index 1f858e7fad..be1a9f91ee 100644 --- a/operator/controllers/mlops/pipeline_controller.go +++ b/operator/controllers/mlops/pipeline_controller.go @@ -11,6 +11,7 @@ package mlops import ( "context" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" @@ -92,10 +93,15 @@ func (r *PipelineReconciler) handleFinalizer( // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithName("Reconcile") + logger := log.FromContext(ctx).WithName("PipelineReconcile") ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) defer cancel() + now := time.Now() + defer func() { + logger.Info("Finished Pipeline Reconcile", "duration", time.Since(now)) + }() + pipeline := &mlopsv1alpha1.Pipeline{} if err := r.Get(ctx, req.NamespacedName, pipeline); err != nil { if errors.IsNotFound(err) { diff --git a/operator/controllers/mlops/seldonruntime_controller.go b/operator/controllers/mlops/seldonruntime_controller.go index 52d5a4fd3b..0c56251357 100644 --- a/operator/controllers/mlops/seldonruntime_controller.go +++ b/operator/controllers/mlops/seldonruntime_controller.go @@ -12,6 +12,7 @@ package mlops import ( "context" "fmt" + "time" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" @@ -45,10 +46,12 @@ type SeldonRuntimeReconciler struct { Recorder record.EventRecorder } -func (r *SeldonRuntimeReconciler) getNumberOfServers(namespace string) (int, error) { +func (r *SeldonRuntimeReconciler) getNumberOfServers(ctx context.Context, namespace string) (int, error) { servers := mlopsv1alpha1.ServerList{} + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() inNamespace := client.InNamespace(namespace) - err := r.List(context.TODO(), &servers, inNamespace) + err := r.List(ctx, &servers, inNamespace) if err != nil { if errors.IsNotFound(err) { return 0, nil @@ -69,7 +72,7 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo } } } else { // runtime is being deleted - numServers, err := r.getNumberOfServers(runtime.Namespace) + numServers, err := r.getNumberOfServers(ctx, runtime.Namespace) logger.Info("Runtime being deleted", "namespace", runtime.Namespace, "numServers", numServers) if err != nil { return true, err @@ -119,10 +122,15 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.4/pkg/reconcile func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithName("Reconcile") + logger := log.FromContext(ctx).WithName("SeldonRuntimeReconcile") ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) defer cancel() + now := time.Now() + defer func() { + logger.Info("Finished SeldonRuntime Reconcile", "duration", time.Since(now)) + }() + seldonRuntime := &mlopsv1alpha1.SeldonRuntime{} if err := r.Get(ctx, req.NamespacedName, seldonRuntime); err != nil { if errors.IsNotFound(err) { @@ -141,9 +149,9 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques } sr, err := seldonreconcile.NewSeldonRuntimeReconciler( + ctx, seldonRuntime, common.ReconcilerConfig{ - Ctx: ctx, Logger: logger, Client: r.Client, Recorder: r.Recorder, @@ -160,7 +168,7 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques return reconcile.Result{}, err } - err = sr.Reconcile() + err = sr.Reconcile(ctx) if err != nil { return reconcile.Result{}, err } @@ -187,7 +195,10 @@ func seldonRuntimeReady(status mlopsv1alpha1.SeldonRuntimeStatus) bool { func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.SeldonRuntime, logger logr.Logger) error { existingRuntime := &mlopsv1alpha1.SeldonRuntime{} namespacedName := types.NamespacedName{Name: seldonRuntime.Name, Namespace: seldonRuntime.Namespace} - if err := r.Get(context.TODO(), namespacedName, existingRuntime); err != nil { + + ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) + defer cancel() + if err := r.Get(ctx, namespacedName, existingRuntime); err != nil { if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors return nil } @@ -195,34 +206,36 @@ func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.Seld } if equality.Semantic.DeepEqual(existingRuntime.Status, seldonRuntime.Status) { + return nil // Not updating as no difference + } + + if err := r.Status().Update(ctx, seldonRuntime); err != nil { + logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace) + r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err) + return err } else { - if err := r.Status().Update(context.TODO(), seldonRuntime); err != nil { - logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace) - r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed", - "Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err) - return err - } else { - logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace) - prevWasReady := seldonRuntimeReady(existingRuntime.Status) - currentIsReady := seldonRuntimeReady(seldonRuntime.Status) - if prevWasReady && !currentIsReady { - r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady", - fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName())) - } else if !prevWasReady && currentIsReady { - r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady", - fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName())) - } + logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace) + prevWasReady := seldonRuntimeReady(existingRuntime.Status) + currentIsReady := seldonRuntimeReady(seldonRuntime.Status) + if prevWasReady && !currentIsReady { + r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady", + fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName())) + } else if !prevWasReady && currentIsReady { + r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady", + fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName())) } } + return nil } // Find SeldonRuntimes that reference the changes SeldonConfig -// TODO: pass an actual context from the caller to be used here -func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(_ context.Context, obj client.Object) []reconcile.Request { - ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) +func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(ctx context.Context, obj client.Object) []reconcile.Request { + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) defer cancel() + logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromSeldonConfig") var seldonRuntimes mlopsv1alpha1.SeldonRuntimeList if err := r.Client.List(ctx, &seldonRuntimes); err != nil { @@ -263,9 +276,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesByNamespace(ctx context.Conte return req } -func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Context, obj client.Object) []reconcile.Request { - ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) +func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(ctx context.Context, obj client.Object) []reconcile.Request { + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) defer cancel() + logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromPipeline") pipeline, ok := obj.(*mlopsv1alpha1.Pipeline) if !ok { @@ -275,9 +289,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Contex return r.mapSeldonRuntimesByNamespace(ctx, pipeline.Namespace, logger) } -func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(_ context.Context, obj client.Object) []reconcile.Request { - ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) +func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(ctx context.Context, obj client.Object) []reconcile.Request { + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout) defer cancel() + logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromModel") model, ok := obj.(*mlopsv1alpha1.Model) if !ok { diff --git a/operator/controllers/mlops/server_controller.go b/operator/controllers/mlops/server_controller.go index d50c267b59..a130b4f180 100644 --- a/operator/controllers/mlops/server_controller.go +++ b/operator/controllers/mlops/server_controller.go @@ -12,6 +12,7 @@ package mlops import ( "context" "fmt" + "time" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" @@ -68,10 +69,15 @@ type ServerReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx).WithName("Reconcile") + logger := log.FromContext(ctx).WithName("ServerReconcile") ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout) defer cancel() + now := time.Now() + defer func() { + logger.Info("Finished Server Reconcile", "duration", time.Since(now)) + }() + logger.Info("Received reconcile for Server", "name", req.Name, "namespace", req.NamespacedName.Namespace) server := &mlopsv1alpha1.Server{} @@ -110,13 +116,11 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr var sr common.Reconciler if r.UseDeploymentsForServers { sr, err = serverreconcile.NewServerReconcilerWithDeployment(server, common.ReconcilerConfig{ - Ctx: ctx, Logger: logger, Client: r.Client, }) } else { sr, err = serverreconcile.NewServerReconciler(server, common.ReconcilerConfig{ - Ctx: ctx, Logger: logger, Client: r.Client, }) @@ -136,7 +140,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } // attempt to deploy - err = sr.Reconcile() + err = sr.Reconcile(ctx) if err != nil { logger.Error(err, "Failed reconciling", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec) r.updateStatusFromError(ctx, logger, server, err) @@ -162,7 +166,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr server.Status.Selector = selector server.Status.Replicas = *server.Spec.Replicas - err = r.updateStatus(server) + err = r.updateStatus(ctx, server) if err != nil { logger.Error(err, "Failed updating status", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec) return reconcile.Result{}, err @@ -184,10 +188,10 @@ func (r *ServerReconciler) updateStatusFromError(ctx context.Context, logger log } } -func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error { +func (r *ServerReconciler) updateStatus(ctx context.Context, server *mlopsv1alpha1.Server) error { existingServer := &mlopsv1alpha1.Server{} namespacedName := types.NamespacedName{Name: server.Name, Namespace: server.Namespace} - if err := r.Get(context.TODO(), namespacedName, existingServer); err != nil { + if err := r.Get(ctx, namespacedName, existingServer); err != nil { if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors return nil } @@ -197,7 +201,7 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error { if equality.Semantic.DeepEqual(existingServer.Status, server.Status) { // Not updating as no difference } else { - if err := r.Status().Update(context.TODO(), server); err != nil { + if err := r.Status().Update(ctx, server); err != nil { r.Recorder.Eventf(server, v1.EventTypeWarning, "UpdateFailed", "Failed to update status for Model %q: %v", server.Name, err) return err @@ -217,10 +221,10 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error { } // Find Servers that need reconcilliation from a change to a given ServerConfig -// TODO: pass an actual context from the caller to be used here -func (r *ServerReconciler) mapServerFromServerConfig(_ context.Context, obj client.Object) []reconcile.Request { - ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout) +func (r *ServerReconciler) mapServerFromServerConfig(ctx context.Context, obj client.Object) []reconcile.Request { + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) defer cancel() + logger := log.FromContext(ctx).WithName("mapServerFromServerConfig") var servers mlopsv1alpha1.ServerList if err := r.Client.List(ctx, &servers); err != nil { diff --git a/operator/controllers/mlops/serverconfig_controller.go b/operator/controllers/mlops/serverconfig_controller.go index fd8f710875..e3c0747882 100644 --- a/operator/controllers/mlops/serverconfig_controller.go +++ b/operator/controllers/mlops/serverconfig_controller.go @@ -39,7 +39,6 @@ type ServerConfigReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile func (r *ServerConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - return ctrl.Result{}, nil } diff --git a/operator/controllers/reconcilers/common/reconciler.go b/operator/controllers/reconcilers/common/reconciler.go index 553dcb4435..ca4a53081d 100644 --- a/operator/controllers/reconcilers/common/reconciler.go +++ b/operator/controllers/reconcilers/common/reconciler.go @@ -20,7 +20,7 @@ import ( ) type Reconciler interface { - Reconcile() error + Reconcile(ctx context.Context) error GetResources() []client.Object GetConditions() []*apis.Condition } @@ -42,7 +42,7 @@ func CopyMap[K, V comparable](m map[K]V) map[K]V { } type ReplicaHandler interface { - GetReplicas() (int32, error) + GetReplicas(ctx context.Context) (int32, error) } type LabelHandler interface { @@ -50,7 +50,6 @@ type LabelHandler interface { } type ReconcilerConfig struct { - Ctx context.Context Client client.Client Recorder record.EventRecorder Logger logr.Logger diff --git a/operator/controllers/reconcilers/seldon/configmap_reconciler.go b/operator/controllers/reconcilers/seldon/configmap_reconciler.go index d0ce62ea62..ecb8b8147b 100644 --- a/operator/controllers/reconcilers/seldon/configmap_reconciler.go +++ b/operator/controllers/reconcilers/seldon/configmap_reconciler.go @@ -156,9 +156,11 @@ func getTracingConfigMap(tracingConfig mlopsv1alpha1.TracingConfig, namespace st }, nil } -func (s *ConfigMapReconciler) getReconcileOperation(idx int, configMap *v1.ConfigMap) (constants.ReconcileOperation, error) { +func (s *ConfigMapReconciler) getReconcileOperation(ctx context.Context, idx int, configMap *v1.ConfigMap) (constants.ReconcileOperation, error) { found := &v1.ConfigMap{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: configMap.GetName(), Namespace: configMap.GetNamespace(), }, found) @@ -178,21 +180,21 @@ func (s *ConfigMapReconciler) getReconcileOperation(idx int, configMap *v1.Confi return constants.ReconcileUpdateNeeded, nil } -func (s *ConfigMapReconciler) Reconcile() error { +func (s *ConfigMapReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("ConfigMapReconcile") for idx, configMap := range s.configMaps { - op, err := s.getReconcileOperation(idx, configMap) + op, err := s.getReconcileOperation(ctx, idx, configMap) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("ConfigMap Create", "Name", configMap.GetName(), "Namespace", configMap.GetNamespace()) - err = s.Client.Create(s.Ctx, configMap) + err = s.Client.Create(ctx, configMap) if err != nil { logger.Error(err, "Failed to create configmap", "Name", configMap.GetName(), "Namespace", configMap.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("ConfigMap Update", "Name", configMap.GetName(), "Namespace", configMap.GetNamespace()) - err = s.Client.Update(s.Ctx, configMap) + err = s.Client.Update(ctx, configMap) if err != nil { logger.Error(err, "Failed to update configmap", "Name", configMap.GetName(), "Namespace", configMap.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/seldon/configmap_reconciler_test.go b/operator/controllers/reconcilers/seldon/configmap_reconciler_test.go index 29106273cd..3286199f78 100644 --- a/operator/controllers/reconcilers/seldon/configmap_reconciler_test.go +++ b/operator/controllers/reconcilers/seldon/configmap_reconciler_test.go @@ -87,11 +87,11 @@ func TestConfigMapReconcile(t *testing.T) { } client = testing2.NewFakeClient(scheme) sr, err := NewConfigMapReconciler( - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, test.config, meta) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) for _, configMapName := range test.expectedConfigMaps { diff --git a/operator/controllers/reconcilers/seldon/deployment_reconciler.go b/operator/controllers/reconcilers/seldon/deployment_reconciler.go index 52f85b5b63..effd430bd4 100644 --- a/operator/controllers/reconcilers/seldon/deployment_reconciler.go +++ b/operator/controllers/reconcilers/seldon/deployment_reconciler.go @@ -107,9 +107,11 @@ func (s *ComponentDeploymentReconciler) GetLabelSelector() string { return fmt.Sprintf("%s=%s", constants.KubernetesNameLabelKey, s.Name) } -func (s *ComponentDeploymentReconciler) getReconcileOperation() (constants.ReconcileOperation, error) { +func (s *ComponentDeploymentReconciler) getReconcileOperation(ctx context.Context) (constants.ReconcileOperation, error) { found := &appsv1.Deployment{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: s.Deployment.GetName(), Namespace: s.Deployment.GetNamespace(), }, found) @@ -146,20 +148,20 @@ func (s *ComponentDeploymentReconciler) getReconcileOperation() (constants.Recon return constants.ReconcileUpdateNeeded, nil } -func (s *ComponentDeploymentReconciler) Reconcile() error { +func (s *ComponentDeploymentReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("DeploymentReconcile") - op, err := s.getReconcileOperation() + op, err := s.getReconcileOperation(ctx) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("Deployment Create", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) - err = s.Client.Create(s.Ctx, s.Deployment) + err = s.Client.Create(ctx, s.Deployment) if err != nil { logger.Error(err, "Failed to create Deployment", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("Deployment Update", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) - err = s.Client.Update(s.Ctx, s.Deployment) + err = s.Client.Update(ctx, s.Deployment) if err != nil { logger.Error(err, "Failed to update statefuleset", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/seldon/deployment_reconciler_test.go b/operator/controllers/reconcilers/seldon/deployment_reconciler_test.go index bb9481f45d..71f85b9d27 100644 --- a/operator/controllers/reconcilers/seldon/deployment_reconciler_test.go +++ b/operator/controllers/reconcilers/seldon/deployment_reconciler_test.go @@ -133,7 +133,7 @@ func TestDeploymentReconcile(t *testing.T) { component, _ = ComponentOverride(component, test.override) sr, err := NewComponentDeploymentReconciler( test.deploymentName, - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, meta, *component.Replicas, component.PodSpec, @@ -142,7 +142,7 @@ func TestDeploymentReconcile(t *testing.T) { test.seldonConfigMeta, annotator) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) if test.error { g.Expect(err).ToNot(BeNil()) } else { @@ -248,7 +248,7 @@ func TestDeploymentOverride(t *testing.T) { component, _ = ComponentOverride(component, test.override) sr, err := NewComponentDeploymentReconciler( test.deploymentName, - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, meta, *component.Replicas, component.PodSpec, @@ -257,7 +257,7 @@ func TestDeploymentOverride(t *testing.T) { metav1.ObjectMeta{}, annotator) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) dep := &appsv1.Deployment{} diff --git a/operator/controllers/reconcilers/seldon/rbac_reconciler.go b/operator/controllers/reconcilers/seldon/rbac_reconciler.go index f6837e5417..03ab4dd002 100644 --- a/operator/controllers/reconcilers/seldon/rbac_reconciler.go +++ b/operator/controllers/reconcilers/seldon/rbac_reconciler.go @@ -11,6 +11,7 @@ package server import ( "context" + "fmt" v1 "k8s.io/api/core/v1" auth "k8s.io/api/rbac/v1" @@ -211,9 +212,11 @@ func getRoles(meta metav1.ObjectMeta) []*auth.Role { } } -func (s *ComponentRBACReconciler) getReconcileOperationForRole(idx int, role *auth.Role) (constants.ReconcileOperation, error) { +func (s *ComponentRBACReconciler) getReconcileOperationForRole(ctx context.Context, idx int, role *auth.Role) (constants.ReconcileOperation, error) { found := &auth.Role{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: role.GetName(), Namespace: role.GetNamespace(), }, found) @@ -233,9 +236,11 @@ func (s *ComponentRBACReconciler) getReconcileOperationForRole(idx int, role *au return constants.ReconcileUpdateNeeded, nil } -func (s *ComponentRBACReconciler) getReconcileOperationForRoleBinding(idx int, roleBinding *auth.RoleBinding) (constants.ReconcileOperation, error) { +func (s *ComponentRBACReconciler) getReconcileOperationForRoleBinding(ctx context.Context, idx int, roleBinding *auth.RoleBinding) (constants.ReconcileOperation, error) { found := &auth.RoleBinding{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: roleBinding.GetName(), Namespace: roleBinding.GetNamespace(), }, found) @@ -255,9 +260,11 @@ func (s *ComponentRBACReconciler) getReconcileOperationForRoleBinding(idx int, r s.Roles[idx].SetResourceVersion(found.ResourceVersion) return constants.ReconcileUpdateNeeded, nil } -func (s *ComponentRBACReconciler) getReconcileOperationForServiceAccount(idx int, serviceAccount *v1.ServiceAccount) (constants.ReconcileOperation, error) { +func (s *ComponentRBACReconciler) getReconcileOperationForServiceAccount(ctx context.Context, serviceAccount *v1.ServiceAccount) (constants.ReconcileOperation, error) { found := &v1.ServiceAccount{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: serviceAccount.GetName(), Namespace: serviceAccount.GetNamespace(), }, found) @@ -271,21 +278,21 @@ func (s *ComponentRBACReconciler) getReconcileOperationForServiceAccount(idx int return constants.ReconcileNoChange, nil } -func (s *ComponentRBACReconciler) reconcileRoles() error { +func (s *ComponentRBACReconciler) reconcileRoles(ctx context.Context) error { logger := s.Logger.WithName("ReconcileRoles") for idx, role := range s.Roles { - op, err := s.getReconcileOperationForRole(idx, role) + op, err := s.getReconcileOperationForRole(ctx, idx, role) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("Role Create", "Name", role.GetName(), "Namespace", role.GetNamespace()) - err = s.Client.Create(s.Ctx, role) + err = s.Client.Create(ctx, role) if err != nil { logger.Error(err, "Failed to create service", "Name", role.GetName(), "Namespace", role.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("Role Update", "Name", role.GetName(), "Namespace", role.GetNamespace()) - err = s.Client.Update(s.Ctx, role) + err = s.Client.Update(ctx, role) if err != nil { logger.Error(err, "Failed to update service", "Name", role.GetName(), "Namespace", role.GetNamespace()) return err @@ -300,21 +307,21 @@ func (s *ComponentRBACReconciler) reconcileRoles() error { return nil } -func (s *ComponentRBACReconciler) reconcileRoleBindings() error { +func (s *ComponentRBACReconciler) reconcileRoleBindings(ctx context.Context) error { logger := s.Logger.WithName("ReconcileRoles") for idx, roleBinding := range s.RoleBindings { - op, err := s.getReconcileOperationForRoleBinding(idx, roleBinding) + op, err := s.getReconcileOperationForRoleBinding(ctx, idx, roleBinding) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("RoleBinding Create", "Name", roleBinding.GetName(), "Namespace", roleBinding.GetNamespace()) - err = s.Client.Create(s.Ctx, roleBinding) + err = s.Client.Create(ctx, roleBinding) if err != nil { logger.Error(err, "Failed to create service", "Name", roleBinding.GetName(), "Namespace", roleBinding.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("RoleBinding Update", "Name", roleBinding.GetName(), "Namespace", roleBinding.GetNamespace()) - err = s.Client.Update(s.Ctx, roleBinding) + err = s.Client.Update(ctx, roleBinding) if err != nil { logger.Error(err, "Failed to update service", "Name", roleBinding.GetName(), "Namespace", roleBinding.GetNamespace()) return err @@ -329,21 +336,21 @@ func (s *ComponentRBACReconciler) reconcileRoleBindings() error { return nil } -func (s *ComponentRBACReconciler) reconcileServiceAccounts() error { +func (s *ComponentRBACReconciler) reconcileServiceAccounts(ctx context.Context) error { logger := s.Logger.WithName("ReconcileServiceAccounts") - for idx, serviceAccount := range s.ServiceAccounts { - op, err := s.getReconcileOperationForServiceAccount(idx, serviceAccount) + for _, serviceAccount := range s.ServiceAccounts { + op, err := s.getReconcileOperationForServiceAccount(ctx, serviceAccount) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("ServiceAccount Create", "Name", serviceAccount.GetName(), "Namespace", serviceAccount.GetNamespace()) - err = s.Client.Create(s.Ctx, serviceAccount) + err = s.Client.Create(ctx, serviceAccount) if err != nil { logger.Error(err, "Failed to create service", "Name", serviceAccount.GetName(), "Namespace", serviceAccount.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("ServiceAccount Update", "Name", serviceAccount.GetName(), "Namespace", serviceAccount.GetNamespace()) - err = s.Client.Update(s.Ctx, serviceAccount) + err = s.Client.Update(ctx, serviceAccount) if err != nil { logger.Error(err, "Failed to update service", "Name", serviceAccount.GetName(), "Namespace", serviceAccount.GetNamespace()) return err @@ -358,18 +365,18 @@ func (s *ComponentRBACReconciler) reconcileServiceAccounts() error { return nil } -func (s *ComponentRBACReconciler) Reconcile() error { - err := s.reconcileServiceAccounts() +func (s *ComponentRBACReconciler) Reconcile(ctx context.Context) error { + err := s.reconcileServiceAccounts(ctx) if err != nil { - return err + return fmt.Errorf("failed to reconcile service accounts: %w", err) } - err = s.reconcileRoles() + err = s.reconcileRoles(ctx) if err != nil { - return err + return fmt.Errorf("failed to reconcile role roles: %w", err) } - err = s.reconcileRoleBindings() + err = s.reconcileRoleBindings(ctx) if err != nil { - return err + return fmt.Errorf("failed to reconcile role bindings: %w", err) } return nil } diff --git a/operator/controllers/reconcilers/seldon/rbac_reconciller_test.go b/operator/controllers/reconcilers/seldon/rbac_reconciller_test.go index 69a919e978..394044c4de 100644 --- a/operator/controllers/reconcilers/seldon/rbac_reconciller_test.go +++ b/operator/controllers/reconcilers/seldon/rbac_reconciller_test.go @@ -69,10 +69,10 @@ func TestRBACReconcile(t *testing.T) { } client = testing2.NewFakeClient(scheme) sr := NewComponentRBACReconciler( - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, meta) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) for _, roleName := range test.expectedRoles { svc := &auth.Role{} diff --git a/operator/controllers/reconcilers/seldon/runtime_reconciler.go b/operator/controllers/reconcilers/seldon/runtime_reconciler.go index cc145cb145..fffb1ceb6c 100644 --- a/operator/controllers/reconcilers/seldon/runtime_reconciler.go +++ b/operator/controllers/reconcilers/seldon/runtime_reconciler.go @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t package server import ( + "context" "fmt" "strconv" @@ -69,6 +70,7 @@ func replicaCalc(resourceCount, maxConsumers, partitions int32) int32 { } func validateScaleSpec( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, @@ -79,7 +81,7 @@ func validateScaleSpec( resourceListObj client.ObjectList, countResources func(client.ObjectList) int, ) error { - ctx, clt, recorder := commonConfig.Ctx, commonConfig.Client, commonConfig.Recorder + clt, recorder := commonConfig.Client, commonConfig.Recorder var maxShardCountMultiplier int32 = DEFAULT_MAX_SHARD_COUNT_MULTIPLIER pipelineScaleConfig := runtime.Spec.Config.ScalingConfig.Pipelines @@ -126,6 +128,7 @@ func validateScaleSpec( } func ValidateScaleSpecPipelines( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, @@ -134,6 +137,7 @@ func ValidateScaleSpecPipelines( reason string, ) error { return validateScaleSpec( + ctx, component, runtime, commonConfig, @@ -149,12 +153,14 @@ func ValidateScaleSpecPipelines( } func ValidateDataflowScaleSpec( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, namespace *string, ) error { return ValidateScaleSpecPipelines( + ctx, component, runtime, commonConfig, @@ -165,12 +171,14 @@ func ValidateDataflowScaleSpec( } func ValidatePipelineGatewaySpec( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, namespace *string, ) error { return ValidateScaleSpecPipelines( + ctx, component, runtime, commonConfig, @@ -181,12 +189,14 @@ func ValidatePipelineGatewaySpec( } func ValidateModelGatewaySpec( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, namespace *string, ) error { return validateScaleSpec( + ctx, component, runtime, commonConfig, @@ -202,6 +212,7 @@ func ValidateModelGatewaySpec( } func ValidateComponent( + ctx context.Context, component *mlopsv1alpha1.ComponentDefn, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, @@ -209,6 +220,7 @@ func ValidateComponent( ) error { if component.Name == mlopsv1alpha1.DataflowEngineName { return ValidateDataflowScaleSpec( + ctx, component, runtime, commonConfig, @@ -217,6 +229,7 @@ func ValidateComponent( } if component.Name == mlopsv1alpha1.PipelineGatewayName { return ValidatePipelineGatewaySpec( + ctx, component, runtime, commonConfig, @@ -225,6 +238,7 @@ func ValidateComponent( } if component.Name == mlopsv1alpha1.ModelGatewayName { return ValidateModelGatewaySpec( + ctx, component, runtime, commonConfig, @@ -252,6 +266,7 @@ func ComponentOverride(component *mlopsv1alpha1.ComponentDefn, override *mlopsv1 } func NewSeldonRuntimeReconciler( + ctx context.Context, runtime *mlopsv1alpha1.SeldonRuntime, commonConfig common.ReconcilerConfig, namespace string, @@ -280,6 +295,7 @@ func NewSeldonRuntimeReconciler( commonConfig.Logger.Info("Creating component", "name", c.Name) c, _ = ComponentOverride(c, override) err = ValidateComponent( + ctx, c, runtime, commonConfig, @@ -380,24 +396,17 @@ func (s *SeldonRuntimeReconciler) GetConditions() []*apis.Condition { return conditions } -func (s *SeldonRuntimeReconciler) Reconcile() error { - err := s.rbacReconciler.Reconcile() - if err != nil { - return err - } - err = s.configMapReconciler.Reconcile() - if err != nil { - return err - } - err = s.serviceReconciler.Reconcile() - if err != nil { - return err +func (s *SeldonRuntimeReconciler) Reconcile(ctx context.Context) error { + reconcilers := []common.Reconciler{ + s.rbacReconciler, s.configMapReconciler, s.serviceReconciler, } - for _, c := range s.componentReconcilers { - err := c.Reconcile() - if err != nil { + reconcilers = append(reconcilers, s.componentReconcilers...) + + for _, reconciler := range reconcilers { + if err := reconciler.Reconcile(ctx); err != nil { return err } } + return nil } diff --git a/operator/controllers/reconcilers/seldon/runtime_reconciler_test.go b/operator/controllers/reconcilers/seldon/runtime_reconciler_test.go index af78c91786..7d1d7490f6 100644 --- a/operator/controllers/reconcilers/seldon/runtime_reconciler_test.go +++ b/operator/controllers/reconcilers/seldon/runtime_reconciler_test.go @@ -160,9 +160,9 @@ func TestRuntimeReconcile(t *testing.T) { client = testing2.NewFakeClient(scheme) } g.Expect(err).To(BeNil()) - sr, err := NewSeldonRuntimeReconciler(test.runtime, common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, constants.SeldonNamespace) + sr, err := NewSeldonRuntimeReconciler(context.Background(), test.runtime, common.ReconcilerConfig{Logger: logger, Client: client}, constants.SeldonNamespace) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) if test.error { g.Expect(err).ToNot(BeNil()) } else { diff --git a/operator/controllers/reconcilers/seldon/service_reconciler.go b/operator/controllers/reconcilers/seldon/service_reconciler.go index 8877392d22..6cacf16c28 100644 --- a/operator/controllers/reconcilers/seldon/service_reconciler.go +++ b/operator/controllers/reconcilers/seldon/service_reconciler.go @@ -186,9 +186,11 @@ func getSchedulerService(meta metav1.ObjectMeta, serviceConfig mlopsv1alpha1.Ser return svc } -func (s *ComponentServiceReconciler) getReconcileOperation(idx int, svc *v1.Service) (constants.ReconcileOperation, error) { +func (s *ComponentServiceReconciler) getReconcileOperation(ctx context.Context, idx int, svc *v1.Service) (constants.ReconcileOperation, error) { found := &v1.Service{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: svc.GetName(), Namespace: svc.GetNamespace(), }, found) @@ -224,21 +226,21 @@ func (s *ComponentServiceReconciler) getReconcileOperation(idx int, svc *v1.Serv return constants.ReconcileUpdateNeeded, nil } -func (s *ComponentServiceReconciler) Reconcile() error { +func (s *ComponentServiceReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("ServiceReconcile") for idx, svc := range s.Services { - op, err := s.getReconcileOperation(idx, svc) + op, err := s.getReconcileOperation(ctx, idx, svc) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("Service Create", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) - err = s.Client.Create(s.Ctx, svc) + err = s.Client.Create(ctx, svc) if err != nil { logger.Error(err, "Failed to create service", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("Service Update", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) - err = s.Client.Update(s.Ctx, svc) + err = s.Client.Update(ctx, svc) if err != nil { logger.Error(err, "Failed to update service", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/seldon/service_reconciller_test.go b/operator/controllers/reconcilers/seldon/service_reconciller_test.go index 9ec1dfd5f6..41d8567b2f 100644 --- a/operator/controllers/reconcilers/seldon/service_reconciller_test.go +++ b/operator/controllers/reconcilers/seldon/service_reconciller_test.go @@ -92,13 +92,13 @@ func TestServiceReconcile(t *testing.T) { } client = testing2.NewFakeClient(scheme) sr := NewComponentServiceReconciler( - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, meta, test.serviceConfig, test.overrides, annotator) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) for _, svcName := range test.expectedSvcNames { diff --git a/operator/controllers/reconcilers/seldon/statefulset_reconciler.go b/operator/controllers/reconcilers/seldon/statefulset_reconciler.go index 4bd81073e5..2047c09bd1 100644 --- a/operator/controllers/reconcilers/seldon/statefulset_reconciler.go +++ b/operator/controllers/reconcilers/seldon/statefulset_reconciler.go @@ -125,9 +125,11 @@ func (s *ComponentStatefulSetReconciler) GetLabelSelector() string { return fmt.Sprintf("%s=%s", constants.KubernetesNameLabelKey, s.Name) } -func (s *ComponentStatefulSetReconciler) GetReplicas() (int32, error) { +func (s *ComponentStatefulSetReconciler) GetReplicas(ctx context.Context) (int32, error) { found := &appsv1.StatefulSet{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: s.StatefulSet.GetName(), Namespace: s.StatefulSet.GetNamespace(), }, found) @@ -140,9 +142,11 @@ func (s *ComponentStatefulSetReconciler) GetReplicas() (int32, error) { return found.Status.Replicas, nil } -func (s *ComponentStatefulSetReconciler) getReconcileOperation() (constants.ReconcileOperation, error) { +func (s *ComponentStatefulSetReconciler) getReconcileOperation(ctx context.Context) (constants.ReconcileOperation, error) { found := &appsv1.StatefulSet{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: s.StatefulSet.GetName(), Namespace: s.StatefulSet.GetNamespace(), }, found) @@ -185,20 +189,20 @@ func (s *ComponentStatefulSetReconciler) getReconcileOperation() (constants.Reco return constants.ReconcileUpdateNeeded, nil } -func (s *ComponentStatefulSetReconciler) Reconcile() error { +func (s *ComponentStatefulSetReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("StatefulSetReconcile") - op, err := s.getReconcileOperation() + op, err := s.getReconcileOperation(ctx) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("StatefulSet Create", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) - err = s.Client.Create(s.Ctx, s.StatefulSet) + err = s.Client.Create(ctx, s.StatefulSet) if err != nil { logger.Error(err, "Failed to create statefulset", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("StatefulSet Update", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) - err = s.Client.Update(s.Ctx, s.StatefulSet) + err = s.Client.Update(ctx, s.StatefulSet) if err != nil { logger.Error(err, "Failed to update statefulset", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/seldon/statefulset_reconciler_test.go b/operator/controllers/reconcilers/seldon/statefulset_reconciler_test.go index f179bab152..450d97ce19 100644 --- a/operator/controllers/reconcilers/seldon/statefulset_reconciler_test.go +++ b/operator/controllers/reconcilers/seldon/statefulset_reconciler_test.go @@ -144,7 +144,7 @@ func TestStatefulSetReconcile(t *testing.T) { component, _ = ComponentOverride(component, test.override) sr, err := NewComponentStatefulSetReconciler( test.statefulSetName, - common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}, + common.ReconcilerConfig{Logger: logger, Client: client}, meta, *component.Replicas, component.PodSpec, @@ -154,7 +154,7 @@ func TestStatefulSetReconcile(t *testing.T) { test.seldonConfigMeta, annotator) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.TODO()) if test.error { g.Expect(err).ToNot(BeNil()) } else { diff --git a/operator/controllers/reconcilers/server/deployment_reconciler.go b/operator/controllers/reconcilers/server/deployment_reconciler.go index a883ded747..15fd444593 100644 --- a/operator/controllers/reconcilers/server/deployment_reconciler.go +++ b/operator/controllers/reconcilers/server/deployment_reconciler.go @@ -97,10 +97,12 @@ func toDeploymentTest( } } -func (s *ServerDeploymentReconciler) getReconcileOperation() (constants.ReconcileOperation, error) { +func (s *ServerDeploymentReconciler) getReconcileOperation(ctx context.Context) (constants.ReconcileOperation, error) { found := &appsv1.Deployment{} + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() err := s.Client.Get( - context.TODO(), + ctx, types.NamespacedName{ Name: s.Deployment.GetName(), Namespace: s.Deployment.GetNamespace(), @@ -146,21 +148,21 @@ func (s *ServerDeploymentReconciler) getReconcileOperation() (constants.Reconcil return constants.ReconcileUpdateNeeded, nil } -func (s *ServerDeploymentReconciler) Reconcile() error { +func (s *ServerDeploymentReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("DeploymentReconcile") - op, err := s.getReconcileOperation() + op, err := s.getReconcileOperation(ctx) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("Deployment Create", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) - err = s.Client.Create(s.Ctx, s.Deployment) + err = s.Client.Create(ctx, s.Deployment) if err != nil { logger.Error(err, "Failed to create deployment", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("Deployment Update", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) - err = s.Client.Update(s.Ctx, s.Deployment) + err = s.Client.Update(ctx, s.Deployment) if err != nil { logger.Error(err, "Failed to update deployment", "Name", s.Deployment.GetName(), "Namespace", s.Deployment.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/server/deployment_reconciler_test.go b/operator/controllers/reconcilers/server/deployment_reconciler_test.go index 0c36760e14..05b95eae83 100644 --- a/operator/controllers/reconcilers/server/deployment_reconciler_test.go +++ b/operator/controllers/reconcilers/server/deployment_reconciler_test.go @@ -260,7 +260,6 @@ func TestServerDeploymentReconcile(t *testing.T) { annotator := patch.NewAnnotator(constants.LastAppliedConfig) r := NewServerDeploymentReconciler( common.ReconcilerConfig{ - Ctx: context.TODO(), Logger: logger, Client: client}, test.metaServer, @@ -269,10 +268,10 @@ func TestServerDeploymentReconcile(t *testing.T) { test.deploymentStrategy, test.metaServerConfig, annotator) - rop, err := r.getReconcileOperation() + rop, err := r.getReconcileOperation(context.Background()) g.Expect(rop).To(Equal(test.expectedReconcileOp)) g.Expect(err).To(BeNil()) - err = r.Reconcile() + err = r.Reconcile(context.Background()) g.Expect(err).To(BeNil()) found := &appsv1.Deployment{} err = client.Get(context.TODO(), types.NamespacedName{ @@ -423,7 +422,6 @@ func TestLabelsAnnotationsDeployment(t *testing.T) { annotator := patch.NewAnnotator(constants.LastAppliedConfig) r := NewServerDeploymentReconciler( common.ReconcilerConfig{ - Ctx: context.TODO(), Logger: logger, Client: client}, test.metaServer, diff --git a/operator/controllers/reconcilers/server/server_deploy_reconciler.go b/operator/controllers/reconcilers/server/server_deploy_reconciler.go index 74f033caa4..64e62792f8 100644 --- a/operator/controllers/reconcilers/server/server_deploy_reconciler.go +++ b/operator/controllers/reconcilers/server/server_deploy_reconciler.go @@ -10,6 +10,8 @@ the Change License after the Change Date as each is defined in accordance with t package server import ( + "context" + "github.com/banzaicloud/k8s-objectmatcher/patch" "knative.dev/pkg/apis" "sigs.k8s.io/controller-runtime/pkg/client" @@ -54,8 +56,8 @@ func (s *ServerReconcilerWithDeployment) GetLabelSelector() string { return s.DeploymentReconciler.(common.LabelHandler).GetLabelSelector() } -func (s *ServerReconcilerWithDeployment) GetReplicas() (int32, error) { - return s.DeploymentReconciler.(common.ReplicaHandler).GetReplicas() +func (s *ServerReconcilerWithDeployment) GetReplicas(ctx context.Context) (int32, error) { + return s.DeploymentReconciler.(common.ReplicaHandler).GetReplicas(ctx) } func (s *ServerReconcilerWithDeployment) GetResources() []client.Object { @@ -68,8 +70,8 @@ func (s *ServerReconcilerWithDeployment) GetConditions() []*apis.Condition { return conditions } -func (s *ServerReconcilerWithDeployment) Reconcile() error { - err := s.DeploymentReconciler.Reconcile() +func (s *ServerReconcilerWithDeployment) Reconcile(ctx context.Context) error { + err := s.DeploymentReconciler.Reconcile(ctx) if err != nil { return err } diff --git a/operator/controllers/reconcilers/server/server_deploy_reconciler_test.go b/operator/controllers/reconcilers/server/server_deploy_reconciler_test.go index ac50e3c3a5..2f2236ae94 100644 --- a/operator/controllers/reconcilers/server/server_deploy_reconciler_test.go +++ b/operator/controllers/reconcilers/server/server_deploy_reconciler_test.go @@ -87,9 +87,9 @@ func TestServerReconcileWithDeployment(t *testing.T) { client = testing2.NewFakeClient(scheme) } g.Expect(err).To(BeNil()) - sr, err := NewServerReconcilerWithDeployment(test.server, common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}) + sr, err := NewServerReconcilerWithDeployment(test.server, common.ReconcilerConfig{Logger: logger, Client: client}) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.Background()) if test.error { g.Expect(err).ToNot(BeNil()) } else { @@ -202,7 +202,7 @@ func TestNewServerReconcilerWithDeployment(t *testing.T) { client = testing2.NewFakeClient(scheme) } g.Expect(err).To(BeNil()) - _, err = NewServerReconcilerWithDeployment(test.server, common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}) + _, err = NewServerReconcilerWithDeployment(test.server, common.ReconcilerConfig{Logger: logger, Client: client}) if test.error { g.Expect(err).ToNot(BeNil()) } else { diff --git a/operator/controllers/reconcilers/server/server_reconciler.go b/operator/controllers/reconcilers/server/server_reconciler.go index face497f0f..653ce3b43e 100644 --- a/operator/controllers/reconcilers/server/server_reconciler.go +++ b/operator/controllers/reconcilers/server/server_reconciler.go @@ -10,6 +10,7 @@ the Change License after the Change Date as each is defined in accordance with t package server import ( + "context" "fmt" "strings" @@ -65,8 +66,8 @@ func (s *ServerReconciler) GetLabelSelector() string { return s.StatefulSetReconciler.(common.LabelHandler).GetLabelSelector() } -func (s *ServerReconciler) GetReplicas() (int32, error) { - return s.StatefulSetReconciler.(common.ReplicaHandler).GetReplicas() +func (s *ServerReconciler) GetReplicas(ctx context.Context) (int32, error) { + return s.StatefulSetReconciler.(common.ReplicaHandler).GetReplicas(ctx) } func (s *ServerReconciler) GetResources() []client.Object { @@ -81,14 +82,14 @@ func (s *ServerReconciler) GetConditions() []*apis.Condition { return conditions } -func (s *ServerReconciler) Reconcile() error { +func (s *ServerReconciler) Reconcile(ctx context.Context) error { // Reconcile Services - err := s.ServiceReconciler.Reconcile() + err := s.ServiceReconciler.Reconcile(ctx) if err != nil { return err } // Reconcile StatefulSet - err = s.StatefulSetReconciler.Reconcile() + err = s.StatefulSetReconciler.Reconcile(ctx) if err != nil { return err } diff --git a/operator/controllers/reconcilers/server/server_reconciler_test.go b/operator/controllers/reconcilers/server/server_reconciler_test.go index 4113cc65e4..75670cf620 100644 --- a/operator/controllers/reconcilers/server/server_reconciler_test.go +++ b/operator/controllers/reconcilers/server/server_reconciler_test.go @@ -128,9 +128,9 @@ func TestServerReconcile(t *testing.T) { client = testing2.NewFakeClient(scheme) } g.Expect(err).To(BeNil()) - sr, err := NewServerReconciler(test.server, common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}) + sr, err := NewServerReconciler(test.server, common.ReconcilerConfig{Logger: logger, Client: client}) g.Expect(err).To(BeNil()) - err = sr.Reconcile() + err = sr.Reconcile(context.Background()) if test.error { g.Expect(err).ToNot(BeNil()) } else { @@ -251,7 +251,7 @@ func TestNewServerReconciler(t *testing.T) { client = testing2.NewFakeClient(scheme) } g.Expect(err).To(BeNil()) - _, err = NewServerReconciler(test.server, common.ReconcilerConfig{Ctx: context.TODO(), Logger: logger, Client: client}) + _, err = NewServerReconciler(test.server, common.ReconcilerConfig{Logger: logger, Client: client}) if test.error { g.Expect(err).ToNot(BeNil()) } else { diff --git a/operator/controllers/reconcilers/server/service_reconciler.go b/operator/controllers/reconcilers/server/service_reconciler.go index 0c53e6abc1..341509f79e 100644 --- a/operator/controllers/reconcilers/server/service_reconciler.go +++ b/operator/controllers/reconcilers/server/service_reconciler.go @@ -107,9 +107,11 @@ func toServices(meta metav1.ObjectMeta, replicas int) []*v1.Service { return svcs } -func (s *ServerServiceReconciler) getReconcileOperation(idx int, svc *v1.Service) (constants.ReconcileOperation, error) { +func (s *ServerServiceReconciler) getReconcileOperation(ctx context.Context, idx int, svc *v1.Service) (constants.ReconcileOperation, error) { found := &v1.Service{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: svc.GetName(), Namespace: svc.GetNamespace(), }, found) @@ -131,11 +133,11 @@ func (s *ServerServiceReconciler) getReconcileOperation(idx int, svc *v1.Service // Get the expected number of replicas of server specific services that currently should exist // This is found by extracting the annotation added to the main svc -func (s *ServerServiceReconciler) getCurrentSvcReplicas() (int, error) { +func (s *ServerServiceReconciler) getCurrentSvcReplicas(ctx context.Context) (int, error) { founds := &v1.ServiceList{} matchingLabel := client.MatchingLabels{constants.ServerReplicaLabelKey: s.meta.Name} namespace := client.InNamespace(s.Namespace) - err := s.Client.List(s.Ctx, founds, matchingLabel, namespace) + err := s.Client.List(ctx, founds, matchingLabel, namespace) if err != nil { if errors.IsNotFound(err) { return 0, nil @@ -146,8 +148,8 @@ func (s *ServerServiceReconciler) getCurrentSvcReplicas() (int, error) { } // Delete svc replicas not needed -func (s *ServerServiceReconciler) removeExtraSvcReplicas() error { - existingReplicas, err := s.getCurrentSvcReplicas() +func (s *ServerServiceReconciler) removeExtraSvcReplicas(ctx context.Context) error { + existingReplicas, err := s.getCurrentSvcReplicas(ctx) if err != nil { return err } @@ -155,7 +157,7 @@ func (s *ServerServiceReconciler) removeExtraSvcReplicas() error { if existingReplicas > numReplicas { svcsNow := toServices(s.meta, existingReplicas) for i := numReplicas; i < existingReplicas; i++ { - err = s.Client.Delete(s.Ctx, svcsNow[i]) + err = s.Client.Delete(ctx, svcsNow[i]) if err != nil { return err } @@ -164,25 +166,25 @@ func (s *ServerServiceReconciler) removeExtraSvcReplicas() error { return nil } -func (s *ServerServiceReconciler) Reconcile() error { +func (s *ServerServiceReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("ServiceReconcile") - err := s.removeExtraSvcReplicas() + err := s.removeExtraSvcReplicas(ctx) if err != nil { return err } for idx, svc := range s.Services { - op, err := s.getReconcileOperation(idx, svc) + op, err := s.getReconcileOperation(ctx, idx, svc) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("Service Create", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) - err = s.Client.Create(s.Ctx, svc) + err = s.Client.Create(ctx, svc) if err != nil { logger.Error(err, "Failed to create service", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("Service Update", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) - err = s.Client.Update(s.Ctx, svc) + err = s.Client.Update(ctx, svc) if err != nil { logger.Error(err, "Failed to update service", "Name", svc.GetName(), "Namespace", svc.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/server/service_reconciler_test.go b/operator/controllers/reconcilers/server/service_reconciler_test.go index a3c377c2d1..77a38d34b3 100644 --- a/operator/controllers/reconcilers/server/service_reconciler_test.go +++ b/operator/controllers/reconcilers/server/service_reconciler_test.go @@ -99,7 +99,7 @@ func TestServiceReconcile(t *testing.T) { tests := []test{ { name: "Create", - reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Ctx: context.Background(), Logger: logger}, + reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Logger: logger}, metav1.ObjectMeta{ Name: "foo", Namespace: "default", @@ -110,7 +110,7 @@ func TestServiceReconcile(t *testing.T) { }, { name: "Existing svcs from previous reconcile", - reconcilerTime1: NewServerServiceReconciler(common.ReconcilerConfig{Ctx: context.Background(), Logger: logger}, + reconcilerTime1: NewServerServiceReconciler(common.ReconcilerConfig{Logger: logger}, metav1.ObjectMeta{ Name: "foo", Namespace: "default", @@ -118,7 +118,7 @@ func TestServiceReconcile(t *testing.T) { &mlopsv1alpha1.ScalingSpec{ Replicas: getIntPtr(1), }), - reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Ctx: context.Background(), Logger: logger}, + reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Logger: logger}, metav1.ObjectMeta{ Name: "foo", Namespace: "default", @@ -129,7 +129,7 @@ func TestServiceReconcile(t *testing.T) { }, { name: "decrease in number of replicas", - reconcilerTime1: NewServerServiceReconciler(common.ReconcilerConfig{Ctx: context.Background(), Logger: logger}, + reconcilerTime1: NewServerServiceReconciler(common.ReconcilerConfig{Logger: logger}, metav1.ObjectMeta{ Name: "foo", Namespace: "default", @@ -137,7 +137,7 @@ func TestServiceReconcile(t *testing.T) { &mlopsv1alpha1.ScalingSpec{ Replicas: getIntPtr(3), }), - reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Ctx: context.Background(), Logger: logger}, + reconcilerTime2: NewServerServiceReconciler(common.ReconcilerConfig{Logger: logger}, metav1.ObjectMeta{ Name: "foo", Namespace: "default", @@ -169,7 +169,7 @@ func TestServiceReconcile(t *testing.T) { } test.reconcilerTime2.ReconcilerConfig.Client = client - err = test.reconcilerTime2.Reconcile() + err = test.reconcilerTime2.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) for _, svc := range test.reconcilerTime2.Services { found := &v1.Service{} diff --git a/operator/controllers/reconcilers/server/statefulset_reconciler.go b/operator/controllers/reconcilers/server/statefulset_reconciler.go index f2aec17544..b3d87dfc25 100644 --- a/operator/controllers/reconcilers/server/statefulset_reconciler.go +++ b/operator/controllers/reconcilers/server/statefulset_reconciler.go @@ -111,9 +111,11 @@ func (s *ServerStatefulSetReconciler) GetLabelSelector() string { return fmt.Sprintf("%s=%s", constants.ServerLabelNameKey, s.StatefulSet.GetName()) } -func (s *ServerStatefulSetReconciler) GetReplicas() (int32, error) { +func (s *ServerStatefulSetReconciler) GetReplicas(ctx context.Context) (int32, error) { found := &appsv1.StatefulSet{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: s.StatefulSet.GetName(), Namespace: s.StatefulSet.GetNamespace(), }, found) @@ -126,9 +128,11 @@ func (s *ServerStatefulSetReconciler) GetReplicas() (int32, error) { return found.Status.Replicas, nil } -func (s *ServerStatefulSetReconciler) getReconcileOperation() (constants.ReconcileOperation, error) { +func (s *ServerStatefulSetReconciler) getReconcileOperation(ctx context.Context) (constants.ReconcileOperation, error) { found := &appsv1.StatefulSet{} - err := s.Client.Get(context.TODO(), types.NamespacedName{ + ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout) + defer cancel() + err := s.Client.Get(ctx, types.NamespacedName{ Name: s.StatefulSet.GetName(), Namespace: s.StatefulSet.GetNamespace(), }, found) @@ -172,20 +176,20 @@ func (s *ServerStatefulSetReconciler) getReconcileOperation() (constants.Reconci return constants.ReconcileUpdateNeeded, nil } -func (s *ServerStatefulSetReconciler) Reconcile() error { +func (s *ServerStatefulSetReconciler) Reconcile(ctx context.Context) error { logger := s.Logger.WithName("StatefulSetReconcile") - op, err := s.getReconcileOperation() + op, err := s.getReconcileOperation(ctx) switch op { case constants.ReconcileCreateNeeded: logger.V(1).Info("StatefulSet Create", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) - err = s.Client.Create(s.Ctx, s.StatefulSet) + err = s.Client.Create(ctx, s.StatefulSet) if err != nil { logger.Error(err, "Failed to create statefulset", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) return err } case constants.ReconcileUpdateNeeded: logger.V(1).Info("StatefulSet Update", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) - err = s.Client.Update(s.Ctx, s.StatefulSet) + err = s.Client.Update(ctx, s.StatefulSet) if err != nil { logger.Error(err, "Failed to update statefulset", "Name", s.StatefulSet.GetName(), "Namespace", s.StatefulSet.GetNamespace()) return err diff --git a/operator/controllers/reconcilers/server/statefulset_reconciler_test.go b/operator/controllers/reconcilers/server/statefulset_reconciler_test.go index 7b2167ee5b..cae5f76602 100644 --- a/operator/controllers/reconcilers/server/statefulset_reconciler_test.go +++ b/operator/controllers/reconcilers/server/statefulset_reconciler_test.go @@ -332,7 +332,6 @@ func TestStatefulSetReconcile(t *testing.T) { annotator := patch.NewAnnotator(constants.LastAppliedConfig) r := NewServerStatefulSetReconciler( common.ReconcilerConfig{ - Ctx: context.TODO(), Logger: logger, Client: client}, test.metaServer, @@ -342,10 +341,10 @@ func TestStatefulSetReconcile(t *testing.T) { test.statefulSetPersistentVolumeClaimRetentionPolicy, test.metaServerConfig, annotator) - rop, err := r.getReconcileOperation() + rop, err := r.getReconcileOperation(context.Background()) g.Expect(rop).To(Equal(test.expectedReconcileOp)) g.Expect(err).To(BeNil()) - err = r.Reconcile() + err = r.Reconcile(context.TODO()) g.Expect(err).To(BeNil()) found := &appsv1.StatefulSet{} err = client.Get(context.TODO(), types.NamespacedName{ @@ -633,7 +632,6 @@ func TestLabelsAnnotations(t *testing.T) { annotator := patch.NewAnnotator(constants.LastAppliedConfig) r := NewServerStatefulSetReconciler( common.ReconcilerConfig{ - Ctx: context.TODO(), Logger: logger, Client: client}, test.metaServer, diff --git a/operator/pkg/constants/constants.go b/operator/pkg/constants/constants.go index 6372b394a1..d2dffaa93c 100644 --- a/operator/pkg/constants/constants.go +++ b/operator/pkg/constants/constants.go @@ -59,6 +59,7 @@ const ( // this is a constant that can be used to set the timeout for k8s api calls // currently it can be used for a series of calls in a single logical operation // which is expected to be completed in this amount of time (as opposed to a single call) + K8sAPISingleCallTimeout = 10 * time.Second K8sAPICallsTxTimeout = 2 * time.Minute ControlPlaneExecTimeOut = 5 * time.Minute ReconcileTimeout = 5 * time.Minute diff --git a/operator/scheduler/backoff.go b/operator/scheduler/backoff.go new file mode 100644 index 0000000000..a6ad229cfc --- /dev/null +++ b/operator/scheduler/backoff.go @@ -0,0 +1,56 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package scheduler + +import ( + "context" + "time" + + v4backoff "github.com/cenkalti/backoff/v4" + "github.com/go-logr/logr" + "google.golang.org/grpc" + + "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" +) + +func retryFnConstBackoff(fn func() error, log func(err error, duration time.Duration)) error { + return v4backoff.RetryNotify(func() error { + return fn() + }, v4backoff.WithMaxRetries(v4backoff.NewConstantBackOff(schedulerConstantBackoff), schedulerConnectMaxRetries), log) +} + +func retryFnExpBackoff( + fn func(context context.Context, grpcClient scheduler.SchedulerClient, namespace string) error, + conn *grpc.ClientConn, namespace string, logger logr.Logger, +) error { + logger.Info("Retrying to connect", "namespace", namespace) + logFailure := func(err error, delay time.Duration) { + logger.Error(err, "Scheduler not ready") + } + backOffExp := getClientExponentialBackoff() + fnWithArgs := func() error { + grpcClient := scheduler.NewSchedulerClient(conn) + return fn(context.Background(), grpcClient, namespace) + } + err := v4backoff.RetryNotify(fnWithArgs, backOffExp, logFailure) + if err != nil { + logger.Error(err, "Failed to connect to scheduler", "namespace", namespace) + return err + } + return nil +} + +func getClientExponentialBackoff() *v4backoff.ExponentialBackOff { + backOffExp := v4backoff.NewExponentialBackOff() + backOffExp.MaxElapsedTime = backoffMaxElapsedTime + backOffExp.MaxInterval = backOffMaxInterval + backOffExp.InitialInterval = backOffInitialInterval + return backOffExp +} diff --git a/operator/scheduler/client.go b/operator/scheduler/client.go index 8d9c196af1..28abb6ca51 100644 --- a/operator/scheduler/client.go +++ b/operator/scheduler/client.go @@ -17,7 +17,6 @@ import ( "sync" "time" - "github.com/cenkalti/backoff/v4" "github.com/go-logr/logr" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "google.golang.org/grpc" @@ -35,9 +34,15 @@ import ( ) const ( - // these 2 constants in combination with the backoff exponential function will give us a max backoff of 13.5 minutes - schedulerConnectMaxRetries = 100 - schedulerConnectBackoffScalar = 200 * time.Millisecond + // schedulerConstantBackoff time to backoff between retries. We use a constant backoff as the k8s controller + // reconciller will handle exponential backoff and there is a risk of using max retries with too high a value + // and we end up backoff retrying forever. + schedulerConstantBackoff = 2 * time.Second + // schedulerConnectMaxRetries maximum amount of retries to attempt with failed gRPC requests to scheduler + schedulerConnectMaxRetries = 2 +) + +const ( // these keep alive settings need to match the scheduler counterpart in scheduler/pkg/util/constants.go clientKeepAliveTime = 60 * time.Second clientKeepAliveTimeout = 2 * time.Second @@ -98,7 +103,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC // Subscribe the event streams from scheduler go func() { for { - err := retryFn(s.SubscribeModelEvents, conn, namespace, s.logger.WithName("SubscribeModelEvents")) + err := retryFnExpBackoff(s.SubscribeModelEvents, conn, namespace, s.logger.WithName("SubscribeModelEvents")) if err != nil { s.logger.Error(err, "Subscribe ended for model events", "namespace", namespace) } else { @@ -108,7 +113,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC }() go func() { for { - err := retryFn(s.SubscribeServerEvents, conn, namespace, s.logger.WithName("SubscribeServerEvents")) + err := retryFnExpBackoff(s.SubscribeServerEvents, conn, namespace, s.logger.WithName("SubscribeServerEvents")) if err != nil { s.logger.Error(err, "Subscribe ended for server events", "namespace", namespace) } else { @@ -118,7 +123,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC }() go func() { for { - err := retryFn(s.SubscribePipelineEvents, conn, namespace, s.logger.WithName("SubscribePipelineEvents")) + err := retryFnExpBackoff(s.SubscribePipelineEvents, conn, namespace, s.logger.WithName("SubscribePipelineEvents")) if err != nil { s.logger.Error(err, "Subscribe ended for pipeline events", "namespace", namespace) } else { @@ -128,7 +133,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC }() go func() { for { - err := retryFn(s.SubscribeExperimentEvents, conn, namespace, s.logger.WithName("SubscribeExperimentEvents")) + err := retryFnExpBackoff(s.SubscribeExperimentEvents, conn, namespace, s.logger.WithName("SubscribeExperimentEvents")) if err != nil { s.logger.Error(err, "Subscribe ended for experiment events", "namespace", namespace) } else { @@ -138,7 +143,7 @@ func (s *SchedulerClient) startEventHanders(namespace string, conn *grpc.ClientC }() go func() { for { - err := retryFn(s.SubscribeControlPlaneEvents, conn, namespace, s.logger.WithName("SubscribeControlPlaneEvents")) + err := retryFnExpBackoff(s.SubscribeControlPlaneEvents, conn, namespace, s.logger.WithName("SubscribeControlPlaneEvents")) if err != nil { s.logger.Error(err, "Subscribe ended for control plane events", "namespace", namespace) } else { @@ -323,32 +328,3 @@ func (s *SchedulerClient) checkErrorRetryable(resource string, resourceName stri return false } } - -func retryFn( - fn func(context context.Context, grpcClient scheduler.SchedulerClient, namespace string) error, - conn *grpc.ClientConn, namespace string, logger logr.Logger, -) error { - logger.Info("Retrying to connect", "namespace", namespace) - logFailure := func(err error, delay time.Duration) { - logger.Error(err, "Scheduler not ready") - } - backOffExp := getClientExponentialBackoff() - fnWithArgs := func() error { - grpcClient := scheduler.NewSchedulerClient(conn) - return fn(context.Background(), grpcClient, namespace) - } - err := backoff.RetryNotify(fnWithArgs, backOffExp, logFailure) - if err != nil { - logger.Error(err, "Failed to connect to scheduler", "namespace", namespace) - return err - } - return nil -} - -func getClientExponentialBackoff() *backoff.ExponentialBackOff { - backOffExp := backoff.NewExponentialBackOff() - backOffExp.MaxElapsedTime = backoffMaxElapsedTime - backOffExp.MaxInterval = backOffMaxInterval - backOffExp.InitialInterval = backOffInitialInterval - return backOffExp -} diff --git a/operator/scheduler/control_plane.go b/operator/scheduler/control_plane.go index a8c339bb06..eab3470c5f 100644 --- a/operator/scheduler/control_plane.go +++ b/operator/scheduler/control_plane.go @@ -15,8 +15,7 @@ import ( "io" "time" - "github.com/cenkalti/backoff/v4" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + v4backoff "github.com/cenkalti/backoff/v4" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -28,17 +27,16 @@ import ( func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { logger := s.logger.WithName("SubscribeControlPlaneEvents") - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() + stream, err := grpcClient.SubscribeControlPlane( ctx, &scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) + if err != nil { - return err + return fmt.Errorf("gRPC SubscribeControlPlane failed: %w", err) } for { @@ -56,7 +54,7 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC } go func() { - err := backoff.Retry(func() error { + err := v4backoff.Retry(func() error { // in general, we could have also handled timeout via a context with timeout // but we want to handle the timeout in a more controlled way and not depending on the other side _, err = execWithTimeout(ctx, fn, constants.ControlPlaneExecTimeOut) @@ -65,7 +63,7 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC return err } return nil - }, backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute*10))) + }, v4backoff.NewExponentialBackOff(v4backoff.WithMaxElapsedTime(time.Minute*10))) if err != nil { logger.Error(err, "Failed to handle event", "namespace", namespace, "event", event) return diff --git a/operator/scheduler/experiment.go b/operator/scheduler/experiment.go index ea9376925c..3fa7393a3d 100644 --- a/operator/scheduler/experiment.go +++ b/operator/scheduler/experiment.go @@ -11,9 +11,10 @@ package scheduler import ( "context" + "fmt" "io" + "time" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/retry" @@ -41,12 +42,20 @@ func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alp Experiment: experiment.AsSchedulerExperimentRequest(), } logger.Info("Start", "experiment name", experiment.Name) - _, err = grpcClient.StartExperiment( - ctx, - req, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err := grpcClient.StartExperiment( + ctx, + req, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "StartExperiment failed, retrying", "duration", duration) + }) + return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err } @@ -64,12 +73,20 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph Name: experiment.Name, } logger.Info("Stop", "experiment name", experiment.Name) - _, err = grpcClient.StopExperiment( - ctx, - req, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err := grpcClient.StopExperiment( + ctx, + req, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "StopExperiment failed, retrying", "duration", duration) + }) + return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err } @@ -77,14 +94,16 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { logger := s.logger.WithName("SubscribeExperimentEvents") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := grpcClient.SubscribeExperimentStatus( ctx, &scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) + if err != nil { - return err + return fmt.Errorf("gRPC SubscribeExperimentStatus failed: %w", err) } for { diff --git a/operator/scheduler/model.go b/operator/scheduler/model.go index 8d2c1462ee..12c3659320 100644 --- a/operator/scheduler/model.go +++ b/operator/scheduler/model.go @@ -13,9 +13,9 @@ import ( "context" "fmt" "io" + "time" "github.com/go-logr/logr" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -58,12 +58,19 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model, Model: md, } - _, err = grpcClient.LoadModel( - ctx, - &loadModelRequest, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err := grpcClient.LoadModel( + ctx, + &loadModelRequest, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "LoadModel failed, retrying", "duration", duration) + }) + if err != nil { return s.checkErrorRetryable(model.Kind, model.Name, err), err } @@ -100,12 +107,19 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model }, } - _, err = grpcClient.UnloadModel( - ctx, - modelRef, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err := grpcClient.UnloadModel( + ctx, + modelRef, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "UnloadModel failed, retrying", "duration", duration) + }) + if err != nil { return s.checkErrorRetryable(model.Kind, model.Name, err), err } @@ -115,14 +129,16 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { logger := s.logger.WithName("SubscribeModelEvents") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := grpcClient.SubscribeModelStatus( ctx, &scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) + if err != nil { - return err + return fmt.Errorf("gRPC SubscribeModelStatus failed: %w", err) } for { diff --git a/operator/scheduler/pipeline.go b/operator/scheduler/pipeline.go index 4f673d9a49..45bbaaaa0d 100644 --- a/operator/scheduler/pipeline.go +++ b/operator/scheduler/pipeline.go @@ -13,9 +13,9 @@ import ( "context" "fmt" "io" + "time" "github.com/go-logr/logr" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" v1 "k8s.io/api/core/v1" api_errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/retry" @@ -43,12 +43,20 @@ func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.P Pipeline: pipeline.AsSchedulerPipeline(), } logger.Info("Load", "pipeline name", pipeline.Name) - _, err = grpcClient.LoadPipeline( - ctx, - &req, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err := grpcClient.LoadPipeline( + ctx, + &req, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "LoadPipeline failed, retrying", "duration", duration) + }) + return s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err), err } @@ -63,15 +71,24 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1 Name: pipeline.Name, } logger.Info("Unload", "pipeline name", pipeline.Name) - _, err = grpcClient.UnloadPipeline( - ctx, - &req, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) + + err = retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + _, err = grpcClient.UnloadPipeline( + ctx, + &req, + ) + return err + }, func(err error, duration time.Duration) { + logger.Error(err, "UnloadPipeline failed, retrying", "duration", duration) + }) + if err != nil { return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err) } + pipeline.Status.CreateAndSetCondition( v1alpha1.PipelineReady, false, @@ -86,14 +103,16 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1 func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { logger := s.logger.WithName("SubscribePipelineEvents") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := grpcClient.SubscribePipelineStatus( ctx, &scheduler.PipelineSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) + if err != nil { - return err + return fmt.Errorf("gRPC SubscribePipelineStatus failed: %v", err) } for { diff --git a/operator/scheduler/server.go b/operator/scheduler/server.go index ede9a34f7d..c243fc55a4 100644 --- a/operator/scheduler/server.go +++ b/operator/scheduler/server.go @@ -14,8 +14,8 @@ import ( e "errors" "fmt" "io" + "time" - grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/retry" @@ -90,21 +90,27 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler return fmt.Errorf("all servers failed scaling spec check: %w", errs) } - request := &scheduler.ServerNotifyRequest{ - Servers: requests, - IsFirstSync: isFirstSync, - } - _, err := grpcClient.ServerNotify( - ctx, - request, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), - ) - if err != nil { - logger.Error(err, "Failed to send notify server to scheduler") + err := retryFnConstBackoff(func() error { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + _, err := grpcClient.ServerNotify( + ctx, + &scheduler.ServerNotifyRequest{ + Servers: requests, + IsFirstSync: isFirstSync, + }, + ) return err + }, func(err error, duration time.Duration) { + logger.Error(err, "gRPC ServerNotify failed, retrying", "duration", duration) + }) + + if err != nil { + return fmt.Errorf("server notify failed: %w", err) } - logger.V(1).Info("Sent notify server to scheduler", "servers", len(servers), "isFirstSync", isFirstSync) + + logger.Info("Sent notify server to scheduler", "servers", len(servers), "isFirstSync", isFirstSync) return nil } @@ -112,14 +118,16 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient scheduler.SchedulerClient, namespace string) error { logger := s.logger.WithName("SubscribeServerEvents") + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := grpcClient.SubscribeServerStatus( ctx, &scheduler.ServerSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(schedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) + if err != nil { - return err + return fmt.Errorf("SubscribeServerStatus failed: %w", err) } for { diff --git a/scheduler/pkg/server/server.go b/scheduler/pkg/server/server.go index 08e9577d86..47faebbbdc 100644 --- a/scheduler/pkg/server/server.go +++ b/scheduler/pkg/server/server.go @@ -420,6 +420,8 @@ func (s *SchedulerServer) handleScalingConfigChanges() { func (s *SchedulerServer) ServerNotify(ctx context.Context, req *pb.ServerNotifyRequest) (*pb.ServerNotifyResponse, error) { logger := s.logger.WithField("func", "ServerNotify") + logger.Info("Received ServerNotify request", "req", req) + // numExpectedReplicas is only used when we are doing the first sync numExpectedReplicas := uint(0) for _, server := range req.GetServers() {