Skip to content

Commit ebea3ee

Browse files
authored
[Slice] Small Refactor
2 parents 2b8fd11 + db1a143 commit ebea3ee

File tree

4 files changed

+74
-51
lines changed

4 files changed

+74
-51
lines changed

slice/internal/controller/workload_controller.go

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
9797
}
9898

9999
log := ctrl.LoggerFrom(ctx)
100-
log.V(2).Info("Reconcile Workload")
100+
log.V(3).Info("Reconcile Workload")
101101

102102
if r.shouldFinalize(wl) {
103103
if controllerutil.ContainsFinalizer(wl, SliceControllerName) {
104-
err = r.client.Delete(ctx, r.newEmptySlice(wl))
104+
err = r.client.Delete(ctx, core.SliceWithMetadata(wl))
105105
if client.IgnoreNotFound(err) != nil {
106106
return ctrl.Result{}, err
107107
}
@@ -112,50 +112,51 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
112112
}
113113
return ctrl.Result{}, client.IgnoreNotFound(err)
114114
}
115-
log.V(5).Info("Removed finalizer")
115+
log.V(3).Info("Removed finalizer")
116116
}
117117
return ctrl.Result{}, nil
118118
}
119119

120+
if !r.isRelevantWorkload(wl, log) {
121+
return ctrl.Result{}, nil
122+
}
123+
120124
ac, err := r.sliceAC(ctx, wl)
121125
if err != nil {
122126
return reconcile.Result{}, err
123127
}
124128
if ac == nil {
125-
log.V(5).Info("Admission check not found – ignoring reconciliation for now")
129+
log.V(3).Info("Admission check not found - skipping reconciliation")
126130
return reconcile.Result{}, nil
127131
}
128132

129133
log = log.WithValues("admissionCheck", ac.Name)
130134
ctrl.LoggerInto(ctx, log)
131135

132-
if !r.isRelevantWorkload(wl, log) {
133-
return ctrl.Result{}, nil
134-
}
135-
136136
if controllerutil.AddFinalizer(wl, SliceControllerName) {
137137
if err = r.client.Update(ctx, wl); err != nil {
138138
if !apierrors.IsNotFound(err) {
139139
log.Error(err, "Failed to add finalizer")
140140
}
141141
return ctrl.Result{}, client.IgnoreNotFound(err)
142142
}
143-
log.V(5).Info("Added finalizer")
143+
log.V(3).Info("Added finalizer")
144144
return ctrl.Result{}, nil
145145
}
146146

147-
slice := r.newEmptySlice(wl)
148-
149-
err = r.client.Get(ctx, client.ObjectKeyFromObject(slice), slice)
150-
if client.IgnoreNotFound(err) != nil {
147+
slice := v1alpha1.Slice{}
148+
err = r.client.Get(ctx, core.SliceKeyFromWorkload(wl), &slice)
149+
if apierrors.IsNotFound(err) {
150+
// slice not found, create it and exit.
151+
err = r.createSlice(ctx, log, wl, ac)
152+
return ctrl.Result{}, err
153+
} else if err != nil {
154+
// error fetching slice
151155
log.Error(err, "Failed to fetch the Slice")
152156
return ctrl.Result{}, err
153157
}
154-
if err != nil {
155-
return ctrl.Result{}, r.createSlice(ctx, wl, ac)
156-
}
157158

158-
err = r.syncAdmissionCheckStatus(ctx, wl, ac, slice)
159+
err = r.syncAdmissionCheckStatus(ctx, wl, ac, &slice)
159160
return ctrl.Result{}, client.IgnoreNotFound(err)
160161
}
161162

@@ -214,22 +215,7 @@ func (r *WorkloadReconciler) sliceAC(ctx context.Context, wl *kueue.Workload) (*
214215
return workload.FindAdmissionCheck(wl.Status.AdmissionChecks, relevantChecks[0]), nil
215216
}
216217

217-
func (r *WorkloadReconciler) newEmptySlice(wl *kueue.Workload) *v1alpha1.Slice {
218-
return &v1alpha1.Slice{
219-
ObjectMeta: metav1.ObjectMeta{
220-
Name: wl.Name,
221-
Namespace: wl.Namespace,
222-
},
223-
}
224-
}
225-
226-
func (r *WorkloadReconciler) newSlice(wl *kueue.Workload) (*v1alpha1.Slice, error) {
227-
slice := r.newEmptySlice(wl)
228-
229-
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
230-
return nil, err
231-
}
232-
218+
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, wl *kueue.Workload) {
233219
nodeSelectors := sets.New[string]()
234220
for _, psa := range wl.Status.Admission.PodSetAssignments {
235221
// we already validated that all assignments have a valid level,
@@ -242,21 +228,19 @@ func (r *WorkloadReconciler) newSlice(wl *kueue.Workload) (*v1alpha1.Slice, erro
242228
slice.Spec.NodeSelector = map[string][]string{
243229
TPUReservationSubblockLabel: sets.List(nodeSelectors),
244230
}
245-
return slice, nil
246231
}
247232

248-
func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState) error {
249-
log := ctrl.LoggerFrom(ctx)
233+
func (r *WorkloadReconciler) createSlice(ctx context.Context, log logr.Logger, wl *kueue.Workload, ac *kueue.AdmissionCheckState) error {
234+
slice := core.SliceWithMetadata(wl)
235+
log = log.WithValues("slice", klog.KObj(slice))
236+
log.V(3).Info("Creating Slice")
250237

251-
slice, err := r.newSlice(wl)
252-
if err != nil {
238+
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
253239
return err
254240
}
241+
parseTopologyAssignmentIntoNodeSelector(slice, wl)
255242

256-
log = log.WithValues("slice", klog.KObj(slice))
257-
258-
err = r.client.Create(ctx, slice)
259-
if err != nil {
243+
if err := r.client.Create(ctx, slice); err != nil {
260244
msg := fmt.Sprintf("Error creating Slice %q: %v", slice.Name, err)
261245
log.Error(err, msg)
262246
r.record.Event(wl, corev1.EventTypeWarning, FailedCreateSliceEventType, api.TruncateEventMessage(msg))
@@ -265,8 +249,8 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
265249
return errors.Join(err, patchErr)
266250
}
267251

268-
msg := fmt.Sprintf("The Slice %q has been created", slice.Name)
269-
log.V(5).Info(msg)
252+
msg := fmt.Sprintf("The Slice %s has been created", client.ObjectKeyFromObject(slice))
253+
log.V(3).Info(msg)
270254
r.record.Event(wl, corev1.EventTypeNormal, SliceCreatedEventType, msg)
271255
ac.Message = msg
272256

@@ -356,11 +340,11 @@ func (h *sliceHandler) handleEvent(ctx context.Context, obj client.Object, q wor
356340

357341
owner := metav1.GetControllerOf(slice)
358342
if owner == nil {
359-
log.V(5).Info("Owner not found")
343+
log.V(3).Info("Owner not found")
360344
return
361345
}
362346

363-
log.V(5).Info("Handle Slice event", "workload", klog.KRef(slice.Namespace, slice.Name))
347+
log.V(3).Info("Handle Slice event", "workload", klog.KRef(slice.Namespace, slice.Name))
364348

365349
req := reconcile.Request{
366350
NamespacedName: types.NamespacedName{

slice/internal/controller/workload_controller_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func TestWorkloadReconciler(t *testing.T) {
328328
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
329329
State: kueue.CheckStatePending,
330330
LastTransitionTime: metav1.NewTime(now),
331-
Message: fmt.Sprintf(`The Slice "%s" has been created`, baseWorkloadName),
331+
Message: "The Slice default/workload has been created",
332332
}).
333333
Obj(),
334334
},
@@ -338,7 +338,7 @@ func TestWorkloadReconciler(t *testing.T) {
338338
Key: client.ObjectKeyFromObject(baseWorkloadWrapper),
339339
EventType: corev1.EventTypeNormal,
340340
Reason: SliceCreatedEventType,
341-
Message: fmt.Sprintf(`The Slice "%s" has been created`, baseWorkloadName),
341+
Message: "The Slice default/workload has been created",
342342
},
343343
},
344344
},
@@ -355,7 +355,7 @@ func TestWorkloadReconciler(t *testing.T) {
355355
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
356356
State: kueue.CheckStatePending,
357357
LastTransitionTime: metav1.NewTime(now),
358-
Message: fmt.Sprintf(`The Slice "%s" has been created`, baseWorkloadName),
358+
Message: "The Slice default/workload has been created",
359359
}).
360360
Obj(),
361361
},
@@ -367,7 +367,7 @@ func TestWorkloadReconciler(t *testing.T) {
367367
Key: client.ObjectKeyFromObject(baseWorkloadWrapper),
368368
EventType: corev1.EventTypeNormal,
369369
Reason: SliceCreatedEventType,
370-
Message: fmt.Sprintf(`The Slice "%s" has been created`, baseWorkloadName),
370+
Message: "The Slice default/workload has been created",
371371
},
372372
},
373373
},

slice/internal/core/slice.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The Kubernetes Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package core
16+
17+
import (
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"sigs.k8s.io/controller-runtime/pkg/client"
20+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
21+
22+
"tpu-slice-controller/api/v1alpha1"
23+
)
24+
25+
// TODO: we will soon need to key per podset, in #520
26+
func SliceKeyFromWorkload(wl *kueue.Workload) client.ObjectKey {
27+
slice := SliceWithMetadata(wl)
28+
return client.ObjectKeyFromObject(slice)
29+
}
30+
31+
// TODO: we will soon need to key per podset, in #520
32+
func SliceWithMetadata(wl *kueue.Workload) *v1alpha1.Slice {
33+
return &v1alpha1.Slice{
34+
ObjectMeta: metav1.ObjectMeta{
35+
Name: wl.Name,
36+
Namespace: wl.Namespace,
37+
},
38+
}
39+
}

slice/test/e2e/jobset_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ var _ = ginkgo.Describe("JobSet", func() {
210210
g.Expect(createdWorkload.Status.AdmissionChecks).Should(gomega.BeComparableTo([]kueue.AdmissionCheckState{{
211211
Name: kueue.AdmissionCheckReference(ac.Name),
212212
State: kueue.CheckStatePending,
213-
Message: fmt.Sprintf("The Slice %q has been created", createdSlice.Name),
213+
Message: fmt.Sprintf("The Slice %s/%s has been created", createdSlice.Namespace, createdSlice.Name),
214214
}}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates")))
215215
}, utils.Timeout, utils.Interval).Should(gomega.Succeed())
216216
})

0 commit comments

Comments
 (0)