Skip to content

Commit 24950eb

Browse files
authored
fix(operator): incorrect expected replicas notification (#6890)
* wip * wip * stateful sets working * only notify scheduler of replicas if deployment successful
1 parent cdc0c97 commit 24950eb

File tree

12 files changed

+253
-45
lines changed

12 files changed

+253
-45
lines changed

operator/controllers/mlops/server_controller.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/log"
3131
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3232

33-
"github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
3433
mlopsv1alpha1 "github.com/seldonio/seldon-core/operator/v2/apis/mlops/v1alpha1"
3534
"github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/common"
3635
serverreconcile "github.com/seldonio/seldon-core/operator/v2/controllers/reconcilers/server"
@@ -106,12 +105,6 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
106105
return reconcile.Result{}, reconcile.TerminalError(err)
107106
}
108107

109-
if err := r.Scheduler.ServerNotify(ctx, nil, []v1alpha1.Server{*server}, false); err != nil {
110-
logger.Error(err, "Failed calling ServerNotify", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
111-
r.updateStatusFromError(ctx, logger, server, err)
112-
return reconcile.Result{}, err
113-
}
114-
115108
server.Spec.Replicas = ptr.To(int32(scalingSpec.Replicas))
116109

117110
var sr common.Reconciler
@@ -142,13 +135,23 @@ func (r *ServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
142135
return reconcile.Result{}, err
143136
}
144137

138+
// attempt to deploy
145139
err = sr.Reconcile()
146140
if err != nil {
147141
logger.Error(err, "Failed reconciling", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
148142
r.updateStatusFromError(ctx, logger, server, err)
149143
return reconcile.Result{}, err
150144
}
151145

146+
// only after successful deployment of the pods, we notify the scheduler of the number of Servers it can expect that
147+
// will try and connect. It's important we do this after we deploy, otherwise if deployment fails, the scheduler
148+
// may expect the wrong number, and end up waiting for the 10-minute timeout until loading models onto the Servers.
149+
if err := r.Scheduler.ServerNotify(ctx, nil, []mlopsv1alpha1.Server{*server}, false); err != nil {
150+
logger.Error(err, "Failed calling ServerNotify", "name", req.Name, "namespace", req.Namespace, "spec", server.Spec)
151+
r.updateStatusFromError(ctx, logger, server, err)
152+
return reconcile.Result{}, err
153+
}
154+
152155
conditions := sr.GetConditions()
153156
for _, condition := range conditions {
154157
server.Status.SetCondition(condition)

operator/controllers/mlops/suite_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,14 +467,15 @@ var _ = Describe("Controller", func() {
467467
})
468468

469469
Context("When creating a Server spec", func() {
470-
It("Accepts a valid Spec with minReplicas = 2 replicas not set, therefore replicas should take minReplicas value of 1", func() {
470+
It("Accepts a valid Spec with minReplicas = 2, replicas not set, therefore replicas should take minReplicas value of 2", func() {
471471
testID := "test-mlserver-2"
472472
minReplicas := ptr.To(int32(2))
473473

474474
serverNotifyCalled := &atomic.Bool{}
475475
addExpectedServerNotifyCall(testID, mlopsv1alpha1.ScalingSpec{
476476
MinReplicas: minReplicas,
477477
MaxReplicas: ptr.To(int32(2)),
478+
Replicas: minReplicas,
478479
}, serverNotifyCalled)
479480

480481
ctx := context.Background()

operator/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func main() {
159159
schedulerClient := scheduler.NewSchedulerClient(logger,
160160
mgr.GetClient(),
161161
mgr.GetEventRecorderFor("scheduler-client"),
162-
*tlsOptions)
162+
*tlsOptions, useDeploymentsForServers)
163163

164164
if err = (&mlopscontrollers.ModelReconciler{
165165
Client: mgr.GetClient(),

operator/scheduler/client.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,31 +50,33 @@ const (
5050

5151
type SchedulerClient struct {
5252
client.Client
53-
logger logr.Logger
54-
callOptions []grpc.CallOption
55-
recorder record.EventRecorder
56-
seldonRuntimes map[string]*grpc.ClientConn // map of namespace to grpc connection
57-
tlsOptions tls.TLSOptions
58-
mu sync.Mutex
53+
logger logr.Logger
54+
callOptions []grpc.CallOption
55+
recorder record.EventRecorder
56+
seldonRuntimes map[string]*grpc.ClientConn // map of namespace to grpc connection
57+
tlsOptions tls.TLSOptions
58+
useDeploymentsForServers bool
59+
mu sync.Mutex
5960
}
6061

6162
// connect on demand by add getConnection(namespace) which if not existing calls connect to scheduler.
6263
//
6364
// For this will need to know ports (hardwire for now to 9004 and 9044 - ssl comes fom envvar - so always
6465
// the same for all schedulers
65-
func NewSchedulerClient(logger logr.Logger, client client.Client, recorder record.EventRecorder, tlsOptions tls.TLSOptions) *SchedulerClient {
66+
func NewSchedulerClient(logger logr.Logger, client client.Client, recorder record.EventRecorder, tlsOptions tls.TLSOptions, useDeploymentsForServers bool) *SchedulerClient {
6667
opts := []grpc.CallOption{
6768
grpc.MaxCallSendMsgSize(math.MaxInt32),
6869
grpc.MaxCallRecvMsgSize(math.MaxInt32),
6970
}
7071

7172
return &SchedulerClient{
72-
Client: client,
73-
logger: logger.WithName("SchedulerClient"),
74-
callOptions: opts,
75-
recorder: recorder,
76-
seldonRuntimes: make(map[string]*grpc.ClientConn),
77-
tlsOptions: tlsOptions,
73+
Client: client,
74+
logger: logger.WithName("SchedulerClient"),
75+
callOptions: opts,
76+
recorder: recorder,
77+
seldonRuntimes: make(map[string]*grpc.ClientConn),
78+
tlsOptions: tlsOptions,
79+
useDeploymentsForServers: useDeploymentsForServers,
7880
}
7981
}
8082

@@ -161,7 +163,6 @@ func (s *SchedulerClient) handleControlPlaneEvent(ctx context.Context, grpcClien
161163
s.logger.Error(err, "Failed to send registered server to scheduler")
162164
return err
163165
}
164-
165166
return nil
166167
case scheduler.ControlPlaneResponse_SEND_EXPERIMENTS:
167168
err := s.handleExperiments(ctx, grpcClient, namespace)

operator/scheduler/control_plane_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import (
1616
"testing"
1717
"time"
1818

19+
"github.com/gotidy/ptr"
1920
. "github.com/onsi/gomega"
21+
v1 "k8s.io/api/apps/v1"
2022
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2123
"sigs.k8s.io/controller-runtime/pkg/client"
2224

@@ -161,6 +163,17 @@ func TestControlPlaneEvents(t *testing.T) {
161163
},
162164
Spec: mlopsv1alpha1.ServerSpec{},
163165
},
166+
&v1.StatefulSet{
167+
TypeMeta: metav1.TypeMeta{},
168+
ObjectMeta: metav1.ObjectMeta{
169+
Name: "foo",
170+
Namespace: "default",
171+
Generation: 1,
172+
},
173+
Spec: v1.StatefulSetSpec{
174+
Replicas: ptr.Int32(1),
175+
},
176+
},
164177
},
165178
expected_requests_pipelines: []*scheduler.LoadPipelineRequest{
166179
{
@@ -233,7 +246,7 @@ func TestControlPlaneEvents(t *testing.T) {
233246
t.Run(test.name, func(t *testing.T) {
234247
grpcClient := mockSchedulerGrpcClient{}
235248

236-
controller := newMockControllerClient(test.existing_resources...)
249+
controller := newMockControllerClient(false, test.existing_resources...)
237250

238251
err := controller.SubscribeControlPlaneEvents(context.Background(), &grpcClient, "")
239252
g.Expect(err).To(BeNil())

operator/scheduler/experiment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestSubscribeExperimentsEvents(t *testing.T) {
325325
responses_subscribe_experiments: test.results,
326326
}
327327
}
328-
controller := newMockControllerClient(test.existing_resources...)
328+
controller := newMockControllerClient(false, test.existing_resources...)
329329
err := controller.handleExperiments(context.Background(), &grpcClient, "")
330330
g.Expect(err).To(BeNil())
331331
err = controller.SubscribeExperimentEvents(context.Background(), &grpcClient, "")

operator/scheduler/model_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestSubscribeModelEvents(t *testing.T) {
189189
responses_subscribe_models: test.results,
190190
}
191191
}
192-
controller := newMockControllerClient(test.existing_resources...)
192+
controller := newMockControllerClient(false, test.existing_resources...)
193193
err := controller.handleModels(context.Background(), &grpcClient, "")
194194
g.Expect(err).To(BeNil())
195195
err = controller.SubscribeModelEvents(context.Background(), &grpcClient, "")

operator/scheduler/pipeline_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func TestSubscribePipelineEvents(t *testing.T) {
419419
responses_subscribe_pipelines: test.results,
420420
}
421421
}
422-
controller := newMockControllerClient(test.existing_resources...)
422+
controller := newMockControllerClient(false, test.existing_resources...)
423423
err := controller.handlePipelines(context.Background(), &grpcClient, "")
424424
g.Expect(err).To(BeNil())
425425
err = controller.SubscribePipelineEvents(context.Background(), &grpcClient, "")

operator/scheduler/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ import (
2929
)
3030

3131
func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler.SchedulerClient, servers []v1alpha1.Server, isFirstSync bool) error {
32+
logger := s.logger.WithName("NotifyServer")
3233
if len(servers) == 0 {
34+
logger.Info("No servers to notify scheduler, skipping")
3335
return nil
3436
}
3537

36-
logger := s.logger.WithName("NotifyServer")
3738
if grpcClient == nil {
3839
// we assume that all servers are in the same namespace
3940
namespace := servers[0].Namespace

operator/scheduler/server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func TestServerNotify(t *testing.T) {
244244
grpcClient := mockSchedulerGrpcClient{
245245
requests_servers: []*scheduler.ServerNotify{},
246246
}
247-
controller := newMockControllerClient()
247+
controller := newMockControllerClient(false)
248248
err := controller.ServerNotify(context.Background(), &grpcClient, test.servers, false)
249249
g.Expect(err).To(BeNil())
250250

@@ -470,7 +470,7 @@ func TestSubscribeServerEvents(t *testing.T) {
470470
}
471471

472472
existing_resources := []client.Object{&test.existingServer}
473-
controller := newMockControllerClient(existing_resources...)
473+
controller := newMockControllerClient(false, existing_resources...)
474474
err := controller.SubscribeServerEvents(context.Background(), &grpcClient, "")
475475
g.Expect(err).To(BeNil())
476476

0 commit comments

Comments
 (0)