Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion operator/apis/mlops/v1alpha1/seldonconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func GetSeldonConfigForSeldonRuntime(seldonConfigName string, client client.Clie
return nil, fmt.Errorf("SeldonConfig not specified and is required")
}
sc := SeldonConfig{}
err := client.Get(context.TODO(), types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc)
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout)
defer cancel()
err := client.Get(ctx, types.NamespacedName{Name: seldonConfigName, Namespace: constants.SeldonNamespace}, &sc)
return &sc, err
}

Expand Down
4 changes: 3 additions & 1 deletion operator/apis/mlops/v1alpha1/serverconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func GetServerConfigForServer(serverConfig string, client client.Client) (*Serve
return nil, fmt.Errorf("ServerType not specified and is required")
}
sc := ServerConfig{}
err := client.Get(context.TODO(), types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc)
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPISingleCallTimeout)
defer cancel()
err := client.Get(ctx, types.NamespacedName{Name: serverConfig, Namespace: constants.SeldonNamespace}, &sc)
if err != nil {
return nil, fmt.Errorf("failed to get ServerConfig: %w", err)
}
Expand Down
8 changes: 7 additions & 1 deletion operator/controllers/mlops/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mlops

import (
"context"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -82,10 +83,15 @@ func (r *ExperimentReconciler) handleFinalizer(ctx context.Context, logger logr.
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
logger := log.FromContext(ctx).WithName("ExperimentReconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

now := time.Now()
defer func() {
logger.Info("Finished Experiment Reconcile", "duration", time.Since(now))
}()

experiment := &mlopsv1alpha1.Experiment{}
if err := r.Get(ctx, req.NamespacedName, experiment); err != nil {
if errors.IsNotFound(err) {
Expand Down
8 changes: 7 additions & 1 deletion operator/controllers/mlops/model_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mlops

import (
"context"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -76,10 +77,15 @@ func (r *ModelReconciler) handleFinalizer(ctx context.Context, logger logr.Logge
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch

func (r *ModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
logger := log.FromContext(ctx).WithName("ModelReconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

now := time.Now()
defer func() {
logger.Info("Finished Model Reconcile", "duration", time.Since(now))
}()

model := &mlopsv1alpha1.Model{}
if err := r.Get(ctx, req.NamespacedName, model); err != nil {
if errors.IsNotFound(err) {
Expand Down
8 changes: 7 additions & 1 deletion operator/controllers/mlops/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package mlops

import (
"context"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -92,10 +93,15 @@ func (r *PipelineReconciler) handleFinalizer(
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
logger := log.FromContext(ctx).WithName("PipelineReconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

now := time.Now()
defer func() {
logger.Info("Finished Pipeline Reconcile", "duration", time.Since(now))
}()

pipeline := &mlopsv1alpha1.Pipeline{}
if err := r.Get(ctx, req.NamespacedName, pipeline); err != nil {
if errors.IsNotFound(err) {
Expand Down
75 changes: 45 additions & 30 deletions operator/controllers/mlops/seldonruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package mlops
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -45,10 +46,12 @@ type SeldonRuntimeReconciler struct {
Recorder record.EventRecorder
}

func (r *SeldonRuntimeReconciler) getNumberOfServers(namespace string) (int, error) {
func (r *SeldonRuntimeReconciler) getNumberOfServers(ctx context.Context, namespace string) (int, error) {
servers := mlopsv1alpha1.ServerList{}
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
defer cancel()
inNamespace := client.InNamespace(namespace)
err := r.List(context.TODO(), &servers, inNamespace)
err := r.List(ctx, &servers, inNamespace)
if err != nil {
if errors.IsNotFound(err) {
return 0, nil
Expand All @@ -69,7 +72,7 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
}
}
} else { // runtime is being deleted
numServers, err := r.getNumberOfServers(runtime.Namespace)
numServers, err := r.getNumberOfServers(ctx, runtime.Namespace)
logger.Info("Runtime being deleted", "namespace", runtime.Namespace, "numServers", numServers)
if err != nil {
return true, err
Expand Down Expand Up @@ -119,10 +122,15 @@ func (r *SeldonRuntimeReconciler) handleFinalizer(ctx context.Context, logger lo
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.4/pkg/reconcile
func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
logger := log.FromContext(ctx).WithName("SeldonRuntimeReconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

now := time.Now()
defer func() {
logger.Info("Finished SeldonRuntime Reconcile", "duration", time.Since(now))
}()

seldonRuntime := &mlopsv1alpha1.SeldonRuntime{}
if err := r.Get(ctx, req.NamespacedName, seldonRuntime); err != nil {
if errors.IsNotFound(err) {
Expand All @@ -141,9 +149,9 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

sr, err := seldonreconcile.NewSeldonRuntimeReconciler(
ctx,
seldonRuntime,
common.ReconcilerConfig{
Ctx: ctx,
Logger: logger,
Client: r.Client,
Recorder: r.Recorder,
Expand All @@ -160,7 +168,7 @@ func (r *SeldonRuntimeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return reconcile.Result{}, err
}

err = sr.Reconcile()
err = sr.Reconcile(ctx)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -187,42 +195,47 @@ func seldonRuntimeReady(status mlopsv1alpha1.SeldonRuntimeStatus) bool {
func (r *SeldonRuntimeReconciler) updateStatus(seldonRuntime *mlopsv1alpha1.SeldonRuntime, logger logr.Logger) error {
existingRuntime := &mlopsv1alpha1.SeldonRuntime{}
namespacedName := types.NamespacedName{Name: seldonRuntime.Name, Namespace: seldonRuntime.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingRuntime); err != nil {

ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
defer cancel()
if err := r.Get(ctx, namespacedName, existingRuntime); err != nil {
if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors
return nil
}
return err
}

if equality.Semantic.DeepEqual(existingRuntime.Status, seldonRuntime.Status) {
return nil
// Not updating as no difference
}

if err := r.Status().Update(ctx, seldonRuntime); err != nil {
logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err)
return err
} else {
if err := r.Status().Update(context.TODO(), seldonRuntime); err != nil {
logger.Info("Failed to update status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for SeldonRuntime %q: %v", seldonRuntime.Name, err)
return err
} else {
logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
prevWasReady := seldonRuntimeReady(existingRuntime.Status)
currentIsReady := seldonRuntimeReady(seldonRuntime.Status)
if prevWasReady && !currentIsReady {
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady",
fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName()))
} else if !prevWasReady && currentIsReady {
r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady",
fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName()))
}
logger.Info("Successfully updated status", "name", seldonRuntime.Name, "namespace", seldonRuntime.Namespace)
prevWasReady := seldonRuntimeReady(existingRuntime.Status)
currentIsReady := seldonRuntimeReady(seldonRuntime.Status)
if prevWasReady && !currentIsReady {
r.Recorder.Eventf(seldonRuntime, v1.EventTypeWarning, "SeldonRuntimeNotReady",
fmt.Sprintf("SeldonRuntime %v is no longer Ready", seldonRuntime.GetName()))
} else if !prevWasReady && currentIsReady {
r.Recorder.Eventf(seldonRuntime, v1.EventTypeNormal, "RuntimeReady",
fmt.Sprintf("SeldonRuntime %v is Ready", seldonRuntime.GetName()))
}
}

return nil
}

// Find SeldonRuntimes that reference the changes SeldonConfig
// TODO: pass an actual context from the caller to be used here
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(_ context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromSeldonConfig(ctx context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
defer cancel()

logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromSeldonConfig")
var seldonRuntimes mlopsv1alpha1.SeldonRuntimeList
if err := r.Client.List(ctx, &seldonRuntimes); err != nil {
Expand Down Expand Up @@ -263,9 +276,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesByNamespace(ctx context.Conte
return req
}

func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(ctx context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
defer cancel()

logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromPipeline")
pipeline, ok := obj.(*mlopsv1alpha1.Pipeline)
if !ok {
Expand All @@ -275,9 +289,10 @@ func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromPipeline(_ context.Contex
return r.mapSeldonRuntimesByNamespace(ctx, pipeline.Namespace, logger)
}

func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(_ context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
func (r *SeldonRuntimeReconciler) mapSeldonRuntimesFromModel(ctx context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPICallsTxTimeout)
defer cancel()

logger := log.FromContext(ctx).WithName("mapSeldonRuntimesFromModel")
model, ok := obj.(*mlopsv1alpha1.Model)
if !ok {
Expand Down
26 changes: 15 additions & 11 deletions operator/controllers/mlops/server_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package mlops
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -68,10 +69,15 @@ type ServerReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx).WithName("Reconcile")
logger := log.FromContext(ctx).WithName("ServerReconcile")
ctx, cancel := context.WithTimeout(ctx, constants.ReconcileTimeout)
defer cancel()

now := time.Now()
defer func() {
logger.Info("Finished Server Reconcile", "duration", time.Since(now))
}()

logger.Info("Received reconcile for Server", "name", req.Name, "namespace", req.NamespacedName.Namespace)

server := &mlopsv1alpha1.Server{}
Expand Down Expand Up @@ -110,13 +116,11 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
var sr common.Reconciler
if r.UseDeploymentsForServers {
sr, err = serverreconcile.NewServerReconcilerWithDeployment(server, common.ReconcilerConfig{
Ctx: ctx,
Logger: logger,
Client: r.Client,
})
} else {
sr, err = serverreconcile.NewServerReconciler(server, common.ReconcilerConfig{
Ctx: ctx,
Logger: logger,
Client: r.Client,
})
Expand All @@ -136,7 +140,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// attempt to deploy
err = sr.Reconcile()
err = sr.Reconcile(ctx)
if err != nil {
logger.Error(err, "Failed reconciling", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
r.updateStatusFromError(ctx, logger, server, err)
Expand All @@ -162,7 +166,7 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
server.Status.Selector = selector
server.Status.Replicas = *server.Spec.Replicas

err = r.updateStatus(server)
err = r.updateStatus(ctx, server)
if err != nil {
logger.Error(err, "Failed updating status", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
return reconcile.Result{}, err
Expand All @@ -184,10 +188,10 @@ func (r *ServerReconciler) updateStatusFromError(ctx context.Context, logger log
}
}

func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
func (r *ServerReconciler) updateStatus(ctx context.Context, server *mlopsv1alpha1.Server) error {
existingServer := &mlopsv1alpha1.Server{}
namespacedName := types.NamespacedName{Name: server.Name, Namespace: server.Namespace}
if err := r.Get(context.TODO(), namespacedName, existingServer); err != nil {
if err := r.Get(ctx, namespacedName, existingServer); err != nil {
if apimachinary_errors.IsNotFound(err) { //Ignore NotFound errors
return nil
}
Expand All @@ -197,7 +201,7 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
if equality.Semantic.DeepEqual(existingServer.Status, server.Status) {
// Not updating as no difference
} else {
if err := r.Status().Update(context.TODO(), server); err != nil {
if err := r.Status().Update(ctx, server); err != nil {
r.Recorder.Eventf(server, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for Model %q: %v", server.Name, err)
return err
Expand All @@ -217,10 +221,10 @@ func (r *ServerReconciler) updateStatus(server *mlopsv1alpha1.Server) error {
}

// Find Servers that need reconcilliation from a change to a given ServerConfig
// TODO: pass an actual context from the caller to be used here
func (r *ServerReconciler) mapServerFromServerConfig(_ context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(context.Background(), constants.K8sAPICallsTxTimeout)
func (r *ServerReconciler) mapServerFromServerConfig(ctx context.Context, obj client.Object) []reconcile.Request {
ctx, cancel := context.WithTimeout(ctx, constants.K8sAPISingleCallTimeout)
defer cancel()

logger := log.FromContext(ctx).WithName("mapServerFromServerConfig")
var servers mlopsv1alpha1.ServerList
if err := r.Client.List(ctx, &servers); err != nil {
Expand Down
1 change: 0 additions & 1 deletion operator/controllers/mlops/serverconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ServerConfigReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *ServerConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

return ctrl.Result{}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions operator/controllers/reconcilers/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

type Reconciler interface {
Reconcile() error
Reconcile(ctx context.Context) error
GetResources() []client.Object
GetConditions() []*apis.Condition
}
Expand All @@ -42,15 +42,14 @@ func CopyMap[K, V comparable](m map[K]V) map[K]V {
}

type ReplicaHandler interface {
GetReplicas() (int32, error)
GetReplicas(ctx context.Context) (int32, error)
}

type LabelHandler interface {
GetLabelSelector() string
}

type ReconcilerConfig struct {
Ctx context.Context
Client client.Client
Recorder record.EventRecorder
Logger logr.Logger
Expand Down
Loading
Loading