@@ -35,8 +35,8 @@ import (
35
35
"sigs.k8s.io/controller-runtime/pkg/predicate"
36
36
"sigs.k8s.io/controller-runtime/pkg/reconcile"
37
37
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
38
+ kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
38
39
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
39
- "sigs.k8s.io/kueue/pkg/controller/core/indexer"
40
40
"sigs.k8s.io/kueue/pkg/workload"
41
41
42
42
"tpu-slice-controller/api/v1alpha1"
@@ -45,6 +45,8 @@ import (
45
45
const (
46
46
CleanupSliceFinalizerName = "accelerator.gke.io/slice"
47
47
TPUReservationSubblockLabel = "cloud.google.com/gke-tpu-reservation-subblock"
48
+
49
+ tasLabelValue = "true"
48
50
)
49
51
50
52
var (
@@ -69,7 +71,6 @@ func NewWorkloadReconciler(cl client.Client) *WorkloadReconciler {
69
71
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch
70
72
// +kubebuilder:rbac:groups=slice.accelerator.gke.io,resources=slices,verbs=get;list;watch;create;update;patch;delete
71
73
// +kubebuilder:rbac:groups=slice.accelerator.gke.io,resources=slices/finalizers,verbs=update
72
- // +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch
73
74
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
74
75
75
76
func (r * WorkloadReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
@@ -137,6 +138,7 @@ func (r *WorkloadReconciler) finalize(ctx context.Context, wl *kueue.Workload) e
137
138
138
139
controllerutil .RemoveFinalizer (wl , CleanupSliceFinalizerName )
139
140
if err := r .client .Update (ctx , wl ); err != nil {
141
+ log .Error (err , "Removing finalizer" )
140
142
return err
141
143
}
142
144
@@ -151,25 +153,18 @@ func (r *WorkloadReconciler) podsGracefullyTerminated(ctx context.Context, wl *k
151
153
}
152
154
153
155
owner := metav1 .GetControllerOf (wl )
154
-
155
- jobSet := & jobset.JobSet {}
156
- err := r .client .Get (ctx , types.NamespacedName {Name : owner .Name , Namespace : wl .Namespace }, jobSet )
157
- if client .IgnoreNotFound (err ) != nil {
158
- return false , err
159
- }
160
- // That means the JobSet has already been deleted, along with all associated Jobs and Pods
161
- // we should delete Slice and finalize Workload.
162
- if err != nil {
163
- //nolint:nilerr // Return nil to delete Slice and finalize Workload.
156
+ if owner == nil {
157
+ // That means the JobSet has already been deleted, along with all associated Jobs and Pods
158
+ // we should delete Slice and finalize Workload.
164
159
return true , nil
165
160
}
166
161
167
162
pods := & corev1.PodList {}
168
163
opts := []client.ListOption {
169
- client .InNamespace (jobSet .Namespace ),
170
- client.MatchingLabels {jobset .JobSetNameKey : jobSet .Name },
164
+ client .InNamespace (wl .Namespace ),
165
+ client.MatchingLabels {jobset .JobSetNameKey : owner .Name },
171
166
}
172
- err = r .client .List (ctx , pods , opts ... )
167
+ err : = r .client .List (ctx , pods , opts ... )
173
168
if err != nil {
174
169
return false , err
175
170
}
@@ -241,83 +236,100 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
241
236
For (& kueue.Workload {}).
242
237
Named ("workload" ).
243
238
WithEventFilter (r ).
244
- Watches (& jobset. JobSet {}, & jobSetHandler {r : r }).
239
+ Watches (& corev1. Pod {}, & podHandler {r : r }).
245
240
Complete (r )
246
241
}
247
242
248
243
var _ predicate.Predicate = (* WorkloadReconciler )(nil )
249
244
250
245
func (r * WorkloadReconciler ) Create (e event.CreateEvent ) bool {
251
- return r .handle (e .Object )
246
+ return r .handleEvent (e .Object )
252
247
}
253
248
254
- func (r * WorkloadReconciler ) Delete (event.DeleteEvent ) bool {
255
- // Nothing handle for Delete event.
256
- return false
249
+ func (r * WorkloadReconciler ) Delete (e event.DeleteEvent ) bool {
250
+ return r .handleEvent (e .Object )
257
251
}
258
252
259
253
func (r * WorkloadReconciler ) Update (e event.UpdateEvent ) bool {
260
- return r .handle (e .ObjectNew )
254
+ return r .handleEvent (e .ObjectNew )
261
255
}
262
256
263
257
func (r * WorkloadReconciler ) Generic (event.GenericEvent ) bool {
264
258
// Nothing handle for Generic event.
265
259
return false
266
260
}
267
261
268
- func (r * WorkloadReconciler ) handle (obj client.Object ) bool {
262
+ func (r * WorkloadReconciler ) handleEvent (obj client.Object ) bool {
269
263
wl , isWorkload := obj .(* kueue.Workload )
270
264
if ! isWorkload {
271
265
return true
272
266
}
273
- return r .handleWorkload (wl )
267
+ return r .shouldHandleWorkload (wl )
274
268
}
275
269
276
- func (r * WorkloadReconciler ) handleWorkload (wl * kueue.Workload ) bool {
270
+ func (r * WorkloadReconciler ) shouldHandleWorkload (wl * kueue.Workload ) bool {
277
271
// We should handle all Workloads that have the cleanup slice finalizer.
278
272
return controllerutil .ContainsFinalizer (wl , CleanupSliceFinalizerName ) || ! r .shouldFinalize (wl )
279
273
}
280
274
281
- var _ handler.EventHandler = (* jobSetHandler )(nil )
275
+ var _ handler.EventHandler = (* podHandler )(nil )
282
276
283
- type jobSetHandler struct {
277
+ type podHandler struct {
284
278
r * WorkloadReconciler
285
279
}
286
280
287
- func (h * jobSetHandler ) Generic (context.Context , event.GenericEvent , workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
281
+ func (h * podHandler ) Generic (context.Context , event.GenericEvent , workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
288
282
}
289
283
290
- func (h * jobSetHandler ) Create (_ context.Context , e event.CreateEvent , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
291
- h .handle (e .Object , q )
284
+ func (h * podHandler ) Create (context.Context , event.CreateEvent , workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
292
285
}
293
286
294
- func (h * jobSetHandler ) Update ( _ context.Context , e event.UpdateEvent , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
295
- h .handle ( e . ObjectNew , q )
287
+ func (h * podHandler ) Delete ( ctx context.Context , e event.DeleteEvent , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
288
+ h .handleEvent ( ctx , e . Object , q )
296
289
}
297
290
298
- func (h * jobSetHandler ) Delete (context.Context , event.DeleteEvent , workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
291
+ func (h * podHandler ) Update (context.Context , event.UpdateEvent , workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
299
292
}
300
293
301
- func (h * jobSetHandler ) handle (obj client.Object , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
302
- jobSet , isJobSet := obj .(* jobset.JobSet )
303
- if ! isJobSet {
294
+ func (h * podHandler ) handleEvent (ctx context.Context , obj client.Object , q workqueue.TypedRateLimitingInterface [reconcile.Request ]) {
295
+ pod , isPod := obj .(* corev1.Pod )
296
+ // Only Pods with the TAS label should be handled.
297
+ if ! isPod || pod .Labels [kueuealpha .TASLabel ] != tasLabelValue {
304
298
return
305
299
}
306
300
307
- ctx := context .Background ()
301
+ jobSetName := pod .Labels [jobset .JobSetNameKey ]
302
+ // Only pods owned by a JobSet should be handled.
303
+ if jobSetName == "" {
304
+ return
305
+ }
306
+
307
+ log := ctrl .LoggerFrom (ctx ).WithValues (
308
+ "pod" , klog .KObj (pod ),
309
+ "jobSet" , klog.ObjectRef {Name : jobSetName , Namespace : pod .Namespace },
310
+ )
311
+ ctrl .LoggerInto (ctx , log )
312
+
313
+ log .V (5 ).Info ("Handle Pod event" )
314
+
308
315
workloads := & kueue.WorkloadList {}
309
316
opts := []client.ListOption {
310
- client .InNamespace (jobSet .Namespace ),
311
- client.MatchingFields {indexer . OwnerReferenceUID : string ( jobSet . GetUID ()) },
317
+ client .InNamespace (pod .Namespace ),
318
+ client.MatchingFields {OwnerReferenceName : jobSetName },
312
319
}
313
320
err := h .r .client .List (ctx , workloads , opts ... )
314
321
if err != nil {
315
- h .r .log .Error (err , "failed to list workloads" , "jobset" , klog .KObj (jobSet ))
322
+ log .Error (err , "Failed to list workloads" )
323
+ return
324
+ }
325
+
326
+ if len (workloads .Items ) == 0 {
327
+ log .V (5 ).Info ("No Workloads found for the JobSet – skipping handling" )
316
328
return
317
329
}
318
330
319
331
for _ , wl := range workloads .Items {
320
- if h .r .handleWorkload (& wl ) {
332
+ if h .r .shouldHandleWorkload (& wl ) {
321
333
req := reconcile.Request {
322
334
NamespacedName : types.NamespacedName {
323
335
Name : wl .Name ,
0 commit comments