Skip to content

Commit 556b5b2

Browse files
committed
Use RequeueAfter instead of JobSet and Pods event handler.
1 parent 3d5568e commit 556b5b2

File tree

1 file changed

+3
-126
lines changed

1 file changed

+3
-126
lines changed

slice/internal/controller/workload_controller.go

Lines changed: 3 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
"sigs.k8s.io/controller-runtime/pkg/predicate"
4242
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4343
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
44-
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
4544
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4645
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
4746
"sigs.k8s.io/kueue/pkg/workload"
@@ -110,9 +109,9 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
110109
log.V(3).Info(fmt.Sprintf("Cleaning up the Slice and finalize the Workload because %s", reason))
111110
cleanedUp, err := r.cleanupSlice(ctx, wl)
112111
if err != nil || !cleanedUp {
113-
return ctrl.Result{}, err
112+
// Recheck again in 5 seconds.
113+
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
114114
}
115-
116115
err = r.finalizeWorkload(ctx, wl)
117116
return ctrl.Result{}, client.IgnoreNotFound(err)
118117
}
@@ -449,8 +448,6 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
449448
Named("workload_controller").
450449
WithEventFilter(r).
451450
Watches(&v1alpha1.Slice{}, &sliceHandler{client: r.client}).
452-
Watches(&jobset.JobSet{}, &jobSetHandler{client: r.client}).
453-
Watches(&corev1.Pod{}, &podHandler{client: r.client}).
454451
Complete(r)
455452
}
456453

@@ -500,6 +497,7 @@ func (h *sliceHandler) Generic(context.Context, event.GenericEvent, workqueue.Ty
500497
}
501498

502499
func (h *sliceHandler) Create(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
500+
// No need to handle create event. We should wait for at least Forming state.
503501
}
504502

505503
func (h *sliceHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
@@ -536,124 +534,3 @@ func (h *sliceHandler) handleEvent(ctx context.Context, obj client.Object, q wor
536534

537535
q.AddAfter(req, updatesBatchPeriod)
538536
}
539-
540-
var _ handler.EventHandler = (*jobSetHandler)(nil)
541-
542-
type jobSetHandler struct {
543-
client client.Client
544-
}
545-
546-
func (h *jobSetHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
547-
}
548-
549-
func (h *jobSetHandler) Create(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
550-
}
551-
552-
func (h *jobSetHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
553-
h.handleEvent(ctx, e.Object, q)
554-
}
555-
556-
func (h *jobSetHandler) Update(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
557-
}
558-
559-
func (h *jobSetHandler) handleEvent(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
560-
jobSet, isJobSet := obj.(*jobset.JobSet)
561-
// Only JobSet should be handled.
562-
if !isJobSet {
563-
return
564-
}
565-
566-
log := ctrl.LoggerFrom(ctx).WithValues("jobSet", klog.KRef(jobSet.Namespace, jobSet.Name))
567-
ctrl.LoggerInto(ctx, log)
568-
569-
log.V(3).Info("Handle JobSet event")
570-
571-
handleEventForJobSet(ctx, h.client, jobSet, q)
572-
}
573-
574-
func handleEventForJobSet(ctx context.Context, c client.Client, jobSet *jobset.JobSet, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
575-
log := ctrl.LoggerFrom(ctx)
576-
577-
workloads := &kueue.WorkloadList{}
578-
opts := []client.ListOption{
579-
client.InNamespace(jobSet.Namespace),
580-
client.MatchingFields{OwnerReferenceUID: string(jobSet.UID)},
581-
}
582-
err := c.List(ctx, workloads, opts...)
583-
if err != nil {
584-
log.Error(err, "Failed to list workloads")
585-
return
586-
}
587-
588-
if len(workloads.Items) == 0 {
589-
log.V(3).Info("No Workloads found for the JobSet – skipping handling")
590-
return
591-
}
592-
593-
for _, wl := range workloads.Items {
594-
if shouldHandleWorkload(&wl) {
595-
req := reconcile.Request{
596-
NamespacedName: types.NamespacedName{
597-
Name: wl.Name,
598-
Namespace: wl.Namespace,
599-
},
600-
}
601-
q.AddAfter(req, updatesBatchPeriod)
602-
}
603-
}
604-
}
605-
606-
var _ handler.EventHandler = (*podHandler)(nil)
607-
608-
type podHandler struct {
609-
client client.Client
610-
}
611-
612-
func (h *podHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
613-
}
614-
615-
func (h *podHandler) Create(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
616-
}
617-
618-
func (h *podHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
619-
h.handleEvent(ctx, e.Object, q)
620-
}
621-
622-
func (h *podHandler) Update(context.Context, event.UpdateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
623-
}
624-
625-
func (h *podHandler) handleEvent(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
626-
pod, isPod := obj.(*corev1.Pod)
627-
// Only Pods and Pods with the TAS label should be handled.
628-
if !isPod || pod.Labels[kueuealpha.TASLabel] != tasLabelValue {
629-
return
630-
}
631-
632-
jobSetName := pod.Labels[jobset.JobSetNameKey]
633-
// Only pods owned by a JobSet should be handled.
634-
if jobSetName == "" {
635-
return
636-
}
637-
638-
log := ctrl.LoggerFrom(ctx).WithValues(
639-
"pod", klog.KObj(pod),
640-
"jobSet", klog.KRef(pod.Namespace, jobSetName),
641-
)
642-
ctrl.LoggerInto(ctx, log)
643-
644-
log.V(3).Info("Handle Pod event")
645-
646-
jobSet := &jobset.JobSet{}
647-
jobSetKey := types.NamespacedName{Name: jobSetName, Namespace: pod.Namespace}
648-
err := h.client.Get(ctx, jobSetKey, jobSet)
649-
if err != nil {
650-
if apierrors.IsNotFound(err) {
651-
log.V(3).Info("JobSet not found")
652-
} else {
653-
log.Error(err, "Failed to get JobSet")
654-
}
655-
return
656-
}
657-
658-
handleEventForJobSet(ctx, h.client, jobSet, q)
659-
}

0 commit comments

Comments
 (0)