From 0f97a577d36a3a4f9835bbda8857a96c237a88a0 Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Wed, 2 Jul 2025 15:05:59 +0300 Subject: [PATCH 1/3] Remove Slice finalizer, once all Pods in the Workload have gracefully terminated. --- slice/cmd/main.go | 8 +- slice/config/rbac/role.yaml | 16 + slice/go.mod | 10 + slice/go.sum | 16 +- slice/internal/controller/indexer.go | 42 ++ .../controller/workload_controller.go | 211 +++++++++- .../controller/workload_controller_test.go | 372 +++++++++++++++--- slice/internal/core/slice.go | 17 + slice/internal/util/pod/pod.go | 21 + slice/internal/util/slices/slices.go | 30 ++ slice/internal/util/testing/client.go | 14 + slice/internal/util/testing/wrappers.go | 15 + .../util/testingjobs/jobset/wrappers.go | 37 +- .../internal/util/testingjobs/pod/wrappers.go | 90 +++++ slice/test/e2e/jobset_test.go | 111 ++++++ slice/test/utils/utils.go | 18 + 16 files changed, 931 insertions(+), 97 deletions(-) create mode 100644 slice/internal/controller/indexer.go create mode 100644 slice/internal/util/pod/pod.go create mode 100644 slice/internal/util/slices/slices.go create mode 100644 slice/internal/util/testingjobs/pod/wrappers.go diff --git a/slice/cmd/main.go b/slice/cmd/main.go index f7f295cf4..4173f543a 100644 --- a/slice/cmd/main.go +++ b/slice/cmd/main.go @@ -232,12 +232,18 @@ func main() { } } + ctx := ctrl.SetupSignalHandler() + if err := controller.SetupWorkloadIndexer(ctx, mgr.GetFieldIndexer()); err != nil { + setupLog.Error(err, "unable to setup indexes") + os.Exit(1) + } + go setupControllers(mgr, certsReady) setupProbeEndpoints(mgr, certsReady) setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/slice/config/rbac/role.yaml b/slice/config/rbac/role.yaml index bc18a0d35..9685646d3 100644 --- a/slice/config/rbac/role.yaml +++ b/slice/config/rbac/role.yaml @@ -13,6 +13,14 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -31,6 +39,14 @@ rules: - list - update - watch +- apiGroups: + - jobset.x-k8s.io + resources: + - jobsets + verbs: + - get + - list + - watch - apiGroups: - kueue.x-k8s.io resources: diff --git a/slice/go.mod b/slice/go.mod index eba2c588a..dd7adb241 100644 --- a/slice/go.mod +++ b/slice/go.mod @@ -18,6 +18,16 @@ require ( sigs.k8s.io/kueue v0.13.0 ) +require ( + github.com/cert-manager/cert-manager v1.18.2 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/moby/spdystream v0.5.0 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + sigs.k8s.io/gateway-api v1.1.0 // indirect + sigs.k8s.io/lws v0.6.2 // indirect +) + require ( cel.dev/expr v0.19.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect diff --git a/slice/go.sum b/slice/go.sum index e413668c2..ed451958d 100644 --- a/slice/go.sum +++ b/slice/go.sum @@ -2,12 +2,16 @@ cel.dev/expr v0.19.1 h1:NciYrtDRIR0lNCnH1LFJegdjspNx9fI59O7TWcua/W4= cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cert-manager/cert-manager v1.18.2 h1:H2P75ycGcTMauV3gvpkDqLdS3RSXonWF2S49QGA1PZE= +github.com/cert-manager/cert-manager v1.18.2/go.mod h1:icDJx4kG9BCNpGjBvrmsFd99d+lXUvWdkkcrSSQdIiw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= @@ -19,8 +23,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= -github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls= +github.com/evanphx/json-patch v5.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU= github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -64,6 +68,8 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -88,6 +94,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= +github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU= +github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -96,6 +104,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8= github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y= @@ -277,6 +287,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUo sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.21.0 h1:CYfjpEuicjUecRk+KAeyYh+ouUBn4llGyDYytIGcJS8= sigs.k8s.io/controller-runtime v0.21.0/go.mod h1:OSg14+F65eWqIu4DceX7k/+QRAbTTvxeQSNSOQpukWM= +sigs.k8s.io/gateway-api v1.1.0 h1:DsLDXCi6jR+Xz8/xd0Z1PYl2Pn0TyaFMOPPZIj4inDM= +sigs.k8s.io/gateway-api v1.1.0/go.mod h1:ZH4lHrL2sDi0FHZ9jjneb8kKnGzFWyrTya35sWUTrRs= sigs.k8s.io/jobset v0.8.2 h1:WC5a5G7MqfJJy4p+6OxGMpfbB90KoDSay96Mc4yMMZM= sigs.k8s.io/jobset v0.8.2/go.mod h1:yitjuGOExl2p964nhyevQGIkfiPSRHcdC3zNBneKCT8= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= diff --git a/slice/internal/controller/indexer.go b/slice/internal/controller/indexer.go new file mode 100644 index 000000000..2ec7b60ed --- /dev/null +++ b/slice/internal/controller/indexer.go @@ -0,0 +1,42 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + + "tpu-slice-controller/internal/util/slices" +) + +const ( + OwnerReferenceUID = "metadata.ownerReferences.uid" +) + +// SetupWorkloadIndexer configures the indexer to index specific fields for kueue.Workload resources. +func SetupWorkloadIndexer(ctx context.Context, indexer client.FieldIndexer) error { + if err := indexer.IndexField(ctx, &kueue.Workload{}, OwnerReferenceUID, func(obj client.Object) []string { + return slices.Map(obj.GetOwnerReferences(), func(o *metav1.OwnerReference) string { return string(o.UID) }) + }); err != nil { + return fmt.Errorf("setting index on ownerReferences.uid for Workload: %w", err) + } + return nil +} diff --git a/slice/internal/controller/workload_controller.go b/slice/internal/controller/workload_controller.go index aaf97ea6a..75e137d03 100644 --- a/slice/internal/controller/workload_controller.go +++ b/slice/internal/controller/workload_controller.go @@ -38,7 +38,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/workload" @@ -47,6 +49,7 @@ import ( "tpu-slice-controller/internal/core" "tpu-slice-controller/internal/topology" "tpu-slice-controller/internal/util/api" + utilpod "tpu-slice-controller/internal/util/pod" ) const ( @@ -60,6 +63,7 @@ const ( const ( updatesBatchPeriod = time.Second + cleanupRetryAfter = 5 * time.Second ) var ( @@ -88,6 +92,8 @@ func NewWorkloadReconciler(cl client.Client, record record.EventRecorder) *Workl // +kubebuilder:rbac:groups=slice.accelerator.gke.io,resources=slices,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=slice.accelerator.gke.io,resources=slices/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { wl := &kueue.Workload{} @@ -100,22 +106,16 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.V(3).Info("Reconcile Workload") if finalize, reason := shouldFinalize(wl); finalize { - if controllerutil.ContainsFinalizer(wl, SliceControllerName) { - log.V(3).Info(fmt.Sprintf("Cleaning up the Slice and finalize the Workload because %s", reason)) - err = r.client.Delete(ctx, core.SliceWithMetadata(wl)) - if client.IgnoreNotFound(err) != nil { - return ctrl.Result{}, err - } - controllerutil.RemoveFinalizer(wl, SliceControllerName) - if err := r.client.Update(ctx, wl); err != nil { - if !apierrors.IsNotFound(err) { - log.Error(err, "Failed to remove finalizer") - } - return ctrl.Result{}, client.IgnoreNotFound(err) - } - log.V(3).Info("Removed finalizer") + log.V(3).Info(fmt.Sprintf("Cleaning up the Slice and finalize the Workload because %s", reason)) + cleanedUp, err := r.cleanupSlice(ctx, wl) + if err != nil { + return ctrl.Result{}, err } - return ctrl.Result{}, nil + if !cleanedUp { + return ctrl.Result{RequeueAfter: cleanupRetryAfter}, err + } + err = r.finalizeWorkload(ctx, wl) + return ctrl.Result{}, client.IgnoreNotFound(err) } if err = validateRelevantWorkload(wl); err != nil { @@ -179,10 +179,141 @@ func shouldFinalize(wl *kueue.Workload) (bool, string) { return true, "it is no longer active" } + if !controllerutil.HasControllerReference(wl) { + return true, "it doesn't have owner" + } + + if !hasSupportedOwner(wl) { + return true, "it has an unsupported owner" + } + return false, "" } +func hasSupportedOwner(wl *kueue.Workload) bool { + // For now, we only support JobSets. + return isJobSetOwner(wl) +} + +func isJobSetOwner(wl *kueue.Workload) bool { + if owner := metav1.GetControllerOf(wl); owner != nil { + return owner.APIVersion == jobset.SchemeGroupVersion.String() && owner.Kind == "JobSet" + } + return false +} + +func (r *WorkloadReconciler) cleanupSlice(ctx context.Context, wl *kueue.Workload) (bool, error) { + slice := v1alpha1.Slice{} + sliceKey := core.SliceKeyFromWorkload(wl) + + log := ctrl.LoggerFrom(ctx).WithValues("slice", klog.KRef(sliceKey.Namespace, sliceKey.Name)) + ctrl.LoggerInto(ctx, log) + + err := r.client.Get(ctx, sliceKey, &slice) + if apierrors.IsNotFound(err) { + // slice not found + return true, nil + } else if err != nil { + // error fetching slice + log.Error(err, "Failed to fetch the Slice") + return false, err + } + + if !slice.DeletionTimestamp.IsZero() { + log.V(3).Info("Slice already deleted, finishing cleanup") + return true, nil + } + + if !core.Deformed(&slice) { + terminated, err := r.ownerPodsFinished(ctx, wl) + if err != nil || !terminated { + return false, err + } + } else { + log.V(3).Info("Slice in deformed state") + // We still need to delete the Slice because requeueing causes a conflict error during Slice creation. + } + + log.V(3).Info("Deleting the Slice") + + err = r.client.Delete(ctx, &slice) + if apierrors.IsNotFound(err) { + return true, nil + } else if err != nil { + log.Error(err, "Failed to delete the Slice") + } + + return true, err +} + +func (r *WorkloadReconciler) ownerPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) { + // For now, we only support JobSets. + if isJobSetOwner(wl) { + return r.jobSetPodsFinished(ctx, wl) + } + // Finalize Workloads that have no owner or have unsupported owner types. + return true, nil +} + +func (r *WorkloadReconciler) jobSetPodsFinished(ctx context.Context, wl *kueue.Workload) (bool, error) { + owner := metav1.GetControllerOf(wl) + log := ctrl.LoggerFrom(ctx).WithValues("jobSet", klog.KRef(wl.Namespace, owner.Name)) + jobSet := &jobset.JobSet{} + jobSetKey := types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace} + if err := r.client.Get(ctx, jobSetKey, jobSet); err != nil { + if apierrors.IsNotFound(err) { + log.V(3).Info("JobSet already deleted") + // That means the JobSet has already been deleted, along with all associated Jobs and Pods + // we should delete Slice and cleanup Workload. + return true, nil + } else { + log.Error(err, "Failed to get JobSet") + return false, err + } + } + + pods := &corev1.PodList{} + opts := []client.ListOption{ + client.InNamespace(wl.Namespace), + client.MatchingLabels{jobset.JobSetNameKey: owner.Name}, + } + if err := r.client.List(ctx, pods, opts...); err != nil { + log.Error(err, "Failed to get Pods") + return false, err + } + + for _, pod := range pods.Items { + if !utilpod.IsTerminated(&pod) { + log.V(3).Info("Pods are still running – skipping finalization for now") + return false, nil + } + } + + log.V(3).Info("All Pods in the JobSet have finished") + + return true, nil +} + +func (r *WorkloadReconciler) finalizeWorkload(ctx context.Context, wl *kueue.Workload) error { + log := ctrl.LoggerFrom(ctx) + + controllerutil.RemoveFinalizer(wl, SliceControllerName) + if err := r.client.Update(ctx, wl); err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "Failed to remove the finalizer") + } + return err + } + + log.V(3).Info("Removed finalizer") + + return nil +} + func validateRelevantWorkload(wl *kueue.Workload) error { + if !hasSupportedOwner(wl) { + return errors.New("does not have a supported owner") + } if !hasRelevantPodSet(wl.Spec.PodSets) { return errors.New("does not have a relevant podset") } @@ -287,15 +418,15 @@ func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *k errCond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error)) switch { - case meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Forming)): + case core.Forming(slice): ac.Message = fmt.Sprintf("The Slice %q is being formed", slice.Name) - case meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Ready)): + case core.Ready(slice): ac.State = kueue.CheckStateReady ac.Message = fmt.Sprintf("The Slice %q is fully operational", slice.Name) - case meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Degraded)): + case core.Degraded(slice): ac.State = kueue.CheckStateReady ac.Message = fmt.Sprintf("The Slice %q is running with reduced capacity or performance", slice.Name) - case meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Deformed)): + case core.Deformed(slice): ac.State = kueue.CheckStateRejected ac.Message = fmt.Sprintf("The Slice %q is being torn down", slice.Name) case errCond != nil && errCond.Status == metav1.ConditionTrue: @@ -317,10 +448,48 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). Named("workload_controller"). + WithEventFilter(r). Watches(&v1alpha1.Slice{}, &sliceHandler{client: r.client}). Complete(r) } +var _ predicate.Predicate = (*WorkloadReconciler)(nil) + +func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { + return r.handleEvent(e.Object) +} + +func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool { + return r.handleEvent(e.Object) +} + +func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { + return r.handleEvent(e.ObjectNew) +} + +func (r *WorkloadReconciler) Generic(event.GenericEvent) bool { + // Nothing handle for Generic event. + return false +} + +func shouldHandleWorkload(wl *kueue.Workload) bool { + // We should handle all Workloads that have the cleanup slice finalizer. + if controllerutil.ContainsFinalizer(wl, SliceControllerName) { + return true + } + finalize, _ := shouldFinalize(wl) + // If the Workload doesn’t have a finalizer, we can handle only relevant workloads. + return !finalize && validateRelevantWorkload(wl) == nil +} + +func (r *WorkloadReconciler) handleEvent(obj client.Object) bool { + wl, isWorkload := obj.(*kueue.Workload) + if !isWorkload { + return true + } + return shouldHandleWorkload(wl) +} + var _ handler.EventHandler = (*sliceHandler)(nil) type sliceHandler struct { @@ -330,8 +499,8 @@ type sliceHandler struct { func (h *sliceHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { } -func (h *sliceHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { - h.handleEvent(ctx, e.Object, q) +func (h *sliceHandler) Create(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { + // No need to handle create event. We should wait for at least Forming state. } func (h *sliceHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { diff --git a/slice/internal/controller/workload_controller_test.go b/slice/internal/controller/workload_controller_test.go index a449a5760..cd4f4ba41 100644 --- a/slice/internal/controller/workload_controller_test.go +++ b/slice/internal/controller/workload_controller_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -43,6 +44,8 @@ import ( slice "tpu-slice-controller/api/v1alpha1" "tpu-slice-controller/internal/core" utiltesting "tpu-slice-controller/internal/util/testing" + utiltestingjobsjobset "tpu-slice-controller/internal/util/testingjobs/jobset" + utiltestingjobspod "tpu-slice-controller/internal/util/testingjobs/pod" ) var ( @@ -50,20 +53,33 @@ var ( cmpopts.EquateEmpty(), cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + cmpopts.EquateApproxTime(time.Second), } errTest = errors.New("test error") ) func TestWorkloadReconciler(t *testing.T) { + const ( + baseJobName = "job" + baseJobSetName = "jobset" + basePod1Name = "pod1" + basePod2Name = "pod2" + baseWorkloadName = "workload" + ) + now := time.Now().Truncate(time.Second) fakeClock := testingclock.NewFakeClock(now) - baseWorkloadName := "workload" baseAdmissionCheckName := "ac" baseRequest := types.NamespacedName{Name: baseWorkloadName, Namespace: corev1.NamespaceDefault} + baseJobSetWrapper := utiltestingjobsjobset.MakeJobSet(baseJobSetName, corev1.NamespaceDefault) + basePod1Wrapper := utiltestingjobspod.MakePod(basePod1Name, corev1.NamespaceDefault). + OwnerReference(baseJobSetName, jobset.SchemeGroupVersion.WithKind("JobSet")). + Label(jobset.JobSetNameKey, baseJobSetName) + basePod2Wrapper := basePod1Wrapper.Clone().Name(basePod2Name) baseAdmissionCheckWrapper := utiltesting.MakeAdmissionCheck(baseAdmissionCheckName).ControllerName(SliceControllerName) baseWorkloadWrapper := utiltesting.MakeWorkload(baseWorkloadName, corev1.NamespaceDefault). - UID(types.UID(baseWorkloadName)). + UID(baseWorkloadName). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStatePending, @@ -81,6 +97,8 @@ func TestWorkloadReconciler(t *testing.T) { NodeSelector(core.TPUAcceleratorLabel, "tpu-v7x"). Obj(), ) + baseWorkloadWrapperWithPodSetsAndOwner := baseWorkloadWrapperWithPodSets.Clone(). + ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), baseJobSetName, baseJobSetName) baseWorkloadWrapperWithAdmission := baseWorkloadWrapperWithPodSets.Clone(). ReserveQuota( &kueue.Admission{ @@ -103,6 +121,9 @@ func TestWorkloadReconciler(t *testing.T) { }, }, now, ) + baseWorkloadWrapperWithAdmissionAndOwner := baseWorkloadWrapperWithAdmission.Clone(). + ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), baseJobSetName, baseJobSetName) + baseWorkloadWrapperWithFinalizer := baseWorkloadWrapperWithAdmissionAndOwner.Clone().Finalizers(SliceControllerName) baseSliceWrapper := utiltesting.MakeSliceWrapper(baseWorkloadName, corev1.NamespaceDefault). ControllerReference(kueue.GroupVersion.WithKind("Workload"), baseWorkloadName, baseWorkloadName). NodeSelector(map[string][]string{TPUReservationSubblockLabel: {"subblock1", "subblock2"}}) @@ -117,50 +138,200 @@ func TestWorkloadReconciler(t *testing.T) { wantEvents []utiltesting.EventRecord }{ "should skip reconciliation because the Workload was not found": { - request: types.NamespacedName{Name: "other-workload", Namespace: corev1.NamespaceDefault}, + request: types.NamespacedName{Name: "other-workload", Namespace: corev1.NamespaceDefault}, + objs: []client.Object{baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).Obj()}, + wantWorkloads: []kueue.Workload{*baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).Obj()}, + }, + "should delete the finalizer because the Workload has a DeletionTimestamp": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + DeletionTimestamp(now). + Finalizers(SliceControllerName). + Obj(), + }, + }, + "should delete the finalizer because the Workload is finished": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Finished(). + Finalizers(SliceControllerName). + Obj(), + }, + wantWorkloads: []kueue.Workload{*baseWorkloadWrapperWithAdmissionAndOwner.Clone().Finished().Obj()}, + }, + "should delete the finalizer because the Workload is evicted": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Evicted(). + Finalizers(SliceControllerName). + Obj(), + }, + wantWorkloads: []kueue.Workload{*baseWorkloadWrapperWithAdmissionAndOwner.Clone().Evicted().Obj()}, + }, + "should delete the finalizer because the Workload is deactivated": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + }, + wantWorkloads: []kueue.Workload{*baseWorkloadWrapperWithAdmissionAndOwner.Clone().Active(false).Obj()}, + }, + "should delete the finalizer because the Workload has no owner": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmission.Clone().Finalizers(SliceControllerName).Obj(), + }, + wantWorkloads: []kueue.Workload{*baseWorkloadWrapperWithAdmission.DeepCopy()}, + }, + "should delete the finalizer because the Workload has an unsupported owner": { + request: baseRequest, + objs: []client.Object{ + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmission.Clone(). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), baseJobName, baseJobName). + Finalizers(SliceControllerName). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmission.Clone(). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), baseJobName, baseJobName). + Obj(), + }, + }, + "should delete the finalizer because the Slice status Deformed": { + request: baseRequest, objs: []client.Object{ - baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).DeletionTimestamp(now).Obj(), + baseAdmissionCheckWrapper.DeepCopy(), + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + baseSliceWrapper.Clone().Deformed().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).DeletionTimestamp(now).Obj(), + *baseWorkloadWrapperWithAdmissionAndOwner.Clone().Active(false).Obj(), }, }, - "should delete the finalizer because the Workload has a DeletionTimestamp": { + "shouldn't delete the finalizer because the Slice status Degraded": { request: baseRequest, objs: []client.Object{ - baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).DeletionTimestamp(now).Obj(), + baseAdmissionCheckWrapper.DeepCopy(), + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + baseSliceWrapper.Clone().Degraded().Obj(), + }, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + }, + wantSlices: []slice.Slice{ + *baseSliceWrapper.Clone().Degraded().Obj(), + }, + }, + "should delete the Slice because the Pod Status Succeeded": { + request: baseRequest, + objs: []client.Object{ + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.Clone().StatusPhase(corev1.PodSucceeded).Obj(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), baseSliceWrapper.DeepCopy(), }, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Obj(), + }, }, - "should delete the finalizer because the Workload is finished": { + "should delete the Slice because the Pod Status PodFailed": { request: baseRequest, objs: []client.Object{ - baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).Finished().Obj(), + baseAdmissionCheckWrapper.DeepCopy(), + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.Clone().StatusPhase(corev1.PodFailed).Obj(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), baseSliceWrapper.DeepCopy(), }, - wantWorkloads: []kueue.Workload{*baseWorkloadWrapper.Clone().Finished().Obj()}, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Obj(), + }, }, - "should delete the finalizer because the Workload is evicted": { + "shouldn't delete the Slice because the Pods still running": { request: baseRequest, objs: []client.Object{ - baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).Evicted().Obj(), + baseAdmissionCheckWrapper.DeepCopy(), + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.DeepCopy(), + basePod2Wrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), baseSliceWrapper.DeepCopy(), }, - wantWorkloads: []kueue.Workload{*baseWorkloadWrapper.Clone().Evicted().Obj()}, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + }, + wantSlices: []slice.Slice{ + *baseSliceWrapper.DeepCopy(), + }, }, - "should delete the finalizer because the Workload is deactivated": { + "shouldn't delete the Slice because one of the Pods still running": { request: baseRequest, objs: []client.Object{ - baseWorkloadWrapper.Clone().Finalizers(SliceControllerName).Active(false).Obj(), + baseAdmissionCheckWrapper.DeepCopy(), + baseJobSetWrapper.DeepCopy(), + basePod1Wrapper.Clone().StatusPhase(corev1.PodSucceeded).Obj(), + basePod2Wrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), baseSliceWrapper.DeepCopy(), }, - wantWorkloads: []kueue.Workload{*baseWorkloadWrapper.Clone().Active(false).Obj()}, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Active(false). + Finalizers(SliceControllerName). + Obj(), + }, + wantSlices: []slice.Slice{ + *baseSliceWrapper.DeepCopy(), + }, }, "shouldn't add finalizer because invalid TPU topology annotation": { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapper.Clone(). + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). PodSets( *utiltesting.MakePodSet("ps", 2). Annotation(core.TPUTopologyAnnotation, "4x4"). @@ -183,7 +354,7 @@ func TestWorkloadReconciler(t *testing.T) { Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapper.Clone(). + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). PodSets( *utiltesting.MakePodSet("ps", 2). Annotation(core.TPUTopologyAnnotation, "4x4"). @@ -210,7 +381,7 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapper.Clone(). + baseWorkloadWrapperWithAdmissionAndOwner.Clone(). PodSets( *utiltesting.MakePodSet("ps", 2). Annotation(core.TPUTopologyAnnotation, "4x4x12"). @@ -233,7 +404,7 @@ func TestWorkloadReconciler(t *testing.T) { Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapper.Clone(). + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). PodSets( *utiltesting.MakePodSet("ps", 2). Annotation(core.TPUTopologyAnnotation, "4x4x12"). @@ -260,70 +431,67 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithPodSets.DeepCopy(), + baseWorkloadWrapperWithPodSetsAndOwner.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithPodSets.DeepCopy(), + *baseWorkloadWrapperWithPodSetsAndOwner.DeepCopy(), }, }, "shouldn't add finalizer because there’s no TopologyAssignment": { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithPodSets.Clone(). + baseWorkloadWrapperWithPodSetsAndOwner.Clone(). ReserveQuota( &kueue.Admission{ PodSetAssignments: []kueue.PodSetAssignment{ utiltesting.MakePodSetAssignment("ps1").Obj(), - utiltesting.MakePodSetAssignment("ps2").Obj(), }, }, now, ). Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithPodSets.Clone(). + *baseWorkloadWrapperWithPodSetsAndOwner.Clone(). ReserveQuota( &kueue.Admission{ PodSetAssignments: []kueue.PodSetAssignment{ utiltesting.MakePodSetAssignment("ps1").Obj(), - utiltesting.MakePodSetAssignment("ps2").Obj(), }, }, now, ). Obj(), }, }, - "shouldn't add finalizer because there’s no AdmissionCheck": { + "should add finalizer": { request: baseRequest, objs: []client.Object{ - baseWorkloadWrapperWithAdmission.DeepCopy(), + baseAdmissionCheckWrapper.DeepCopy(), + baseWorkloadWrapperWithAdmissionAndOwner.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.DeepCopy(), + *baseWorkloadWrapperWithAdmissionAndOwner.Clone(). + Finalizers(SliceControllerName). + Obj(), }, }, - "should add finalizer": { + "shouldn't create a Slice because there’s no AdmissionCheck": { request: baseRequest, objs: []client.Object{ - baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). - Obj(), + *baseWorkloadWrapperWithFinalizer.DeepCopy(), }, }, "should create a Slice": { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStatePending, @@ -346,11 +514,10 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Clone().Finalizers(SliceControllerName).Obj(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStatePending, @@ -381,11 +548,10 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStatePending, @@ -408,12 +574,11 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Forming().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStatePending, @@ -428,12 +593,11 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Ready().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStateReady, @@ -456,12 +620,11 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Degraded().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStateReady, @@ -484,12 +647,11 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Deformed().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStateRejected, @@ -512,12 +674,11 @@ func TestWorkloadReconciler(t *testing.T) { request: baseRequest, objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Error().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStateRejected, @@ -541,12 +702,11 @@ func TestWorkloadReconciler(t *testing.T) { objs: []client.Object{ baseAdmissionCheckWrapper.DeepCopy(), baseAdmissionCheckWrapper.Clone().Name(baseAdmissionCheckName + "2").Obj(), - baseWorkloadWrapperWithAdmission.Finalizers(SliceControllerName).DeepCopy(), + baseWorkloadWrapperWithFinalizer.DeepCopy(), baseSliceWrapper.Clone().Ready().Obj(), }, wantWorkloads: []kueue.Workload{ - *baseWorkloadWrapperWithAdmission.Clone(). - Finalizers(SliceControllerName). + *baseWorkloadWrapperWithFinalizer.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: kueue.AdmissionCheckReference(baseAdmissionCheckName), State: kueue.CheckStateReady, @@ -569,6 +729,8 @@ func TestWorkloadReconciler(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { scheme := runtime.NewScheme() + utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(jobset.AddToScheme(scheme)) utilruntime.Must(kueue.AddToScheme(scheme)) utilruntime.Must(slice.AddToScheme(scheme)) @@ -618,6 +780,87 @@ func TestWorkloadReconciler(t *testing.T) { } } +func TestWorkloadReconcilerHandleEvent(t *testing.T) { + now := time.Now().Truncate(time.Second) + + baseWorkloadWithWithAdmission := utiltesting.MakeWorkload("wl", corev1.NamespaceDefault). + PodSets( + *utiltesting.MakePodSet("ps", 2). + Annotation(core.TPUTopologyAnnotation, "4x4x12"). + NodeSelector(core.TPUAcceleratorLabel, "tpu-v7x"). + Obj(), + ). + ReserveQuota( + &kueue.Admission{ + PodSetAssignments: []kueue.PodSetAssignment{ + utiltesting.MakePodSetAssignment("ps1"). + TopologyAssignment([]string{core.TPUBlockLabel, core.TPUSubBlockLabel}, []kueue.TopologyDomainAssignment{ + { + Values: []string{"domain1", "domain2"}, + Count: 2, + }, + }).Obj(), + }, + }, now, + ) + baseWorkloadWithAdmissionOwnerReference := baseWorkloadWithWithAdmission.Clone(). + ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset", "jobset") + + cases := map[string]struct { + obj client.Object + want bool + }{ + "invalid object": { + obj: utiltestingjobspod.MakePod("pod", corev1.NamespaceDefault).Obj(), + want: true, + }, + "has cleanup slice finalizer": { + obj: utiltesting.MakeWorkload("wl", corev1.NamespaceDefault). + Finalizers(SliceControllerName). + Obj(), + want: true, + }, + "valid workload": { + obj: baseWorkloadWithAdmissionOwnerReference.DeepCopy(), + want: true, + }, + "doesn't have owner reference": { + obj: baseWorkloadWithWithAdmission.DeepCopy(), + want: false, + }, + "has unsupported owner reference": { + obj: baseWorkloadWithWithAdmission.Clone(). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "job"). + Obj(), + want: false, + }, + "has DeletionTimestamp": { + obj: baseWorkloadWithAdmissionOwnerReference.Clone().DeletionTimestamp(now).Obj(), + want: false, + }, + "finished": { + obj: baseWorkloadWithAdmissionOwnerReference.Clone().Finished().Obj(), + want: false, + }, + "evicted": { + obj: baseWorkloadWithAdmissionOwnerReference.Clone().Active(false).Obj(), + want: false, + }, + "deactivated": { + obj: baseWorkloadWithAdmissionOwnerReference.Clone().Evicted().Obj(), + want: false, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := NewWorkloadReconciler(nil, nil).handleEvent(tc.obj) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("Result after Update (-want,+got):\n%s", diff) + } + }) + } +} + func TestSliceHandlerHandleEvent(t *testing.T) { const ( baseWlName = "wl" @@ -663,6 +906,11 @@ func TestSliceHandlerHandleEvent(t *testing.T) { ctx, _ := utiltesting.ContextWithLog(t) clientBuilder := fake.NewClientBuilder().WithScheme(scheme) + indexer := utiltesting.AsIndexer(clientBuilder) + if err := SetupWorkloadIndexer(ctx, indexer); err != nil { + t.Fatalf("Setup failed: %v", err) + } + kClient := clientBuilder.Build() testSliceHandler := &sliceHandler{client: kClient} diff --git a/slice/internal/core/slice.go b/slice/internal/core/slice.go index 4ab948c5e..e536b3a42 100644 --- a/slice/internal/core/slice.go +++ b/slice/internal/core/slice.go @@ -15,6 +15,7 @@ package core import ( + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -37,3 +38,19 @@ func SliceWithMetadata(wl *kueue.Workload) *v1alpha1.Slice { }, } } + +func Forming(slice *v1alpha1.Slice) bool { + return meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Forming)) +} + +func Ready(slice *v1alpha1.Slice) bool { + return meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Ready)) +} + +func Degraded(slice *v1alpha1.Slice) bool { + return meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Degraded)) +} + +func Deformed(slice *v1alpha1.Slice) bool { + return meta.IsStatusConditionTrue(slice.Status.Conditions, string(v1alpha1.Deformed)) +} diff --git a/slice/internal/util/pod/pod.go b/slice/internal/util/pod/pod.go new file mode 100644 index 000000000..85d6a74af --- /dev/null +++ b/slice/internal/util/pod/pod.go @@ -0,0 +1,21 @@ +// Copyright The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pod + +import corev1 "k8s.io/api/core/v1" + +func IsTerminated(p *corev1.Pod) bool { + return p.Status.Phase == corev1.PodFailed || p.Status.Phase == corev1.PodSucceeded +} diff --git a/slice/internal/util/slices/slices.go b/slice/internal/util/slices/slices.go new file mode 100644 index 000000000..5c37b8ec2 --- /dev/null +++ b/slice/internal/util/slices/slices.go @@ -0,0 +1,30 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slices + +// Map creates a new slice with the same number elements as the input s with +// the values generated by mapFunc +func Map[From any, To any, S ~[]From](s S, mapFunc func(*From) To) []To { + if s == nil { + return nil + } + ret := make([]To, len(s)) + for i := range s { + ret[i] = mapFunc(&s[i]) + } + return ret +} diff --git a/slice/internal/util/testing/client.go b/slice/internal/util/testing/client.go index 64060ac11..aa04bb7dd 100644 --- a/slice/internal/util/testing/client.go +++ b/slice/internal/util/testing/client.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) type EventRecord struct { @@ -90,3 +91,16 @@ func wrapSSAPatch(patch client.Patch) client.Patch { func TreatSSAAsStrategicMerge(ctx context.Context, clnt client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { return clnt.SubResource(subResourceName).Patch(ctx, obj, wrapSSAPatch(patch), opts...) } + +type builderIndexer struct { + *fake.ClientBuilder +} + +func (b *builderIndexer) IndexField(_ context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + b.ClientBuilder = b.WithIndex(obj, field, extractValue) + return nil +} + +func AsIndexer(builder *fake.ClientBuilder) client.FieldIndexer { + return &builderIndexer{ClientBuilder: builder} +} diff --git a/slice/internal/util/testing/wrappers.go b/slice/internal/util/testing/wrappers.go index aedc3d704..8d378e4a0 100644 --- a/slice/internal/util/testing/wrappers.go +++ b/slice/internal/util/testing/wrappers.go @@ -162,6 +162,11 @@ func (w *WorkloadWrapper) PodSets(podSets ...kueue.PodSet) *WorkloadWrapper { return w } +func (w *WorkloadWrapper) ControllerReference(gvk schema.GroupVersionKind, name, uid string) *WorkloadWrapper { + AppendOwnerReference(&w.Workload, gvk, name, uid, ptr.To(true), ptr.To(true)) + return w +} + type PodSetWrapper struct{ kueue.PodSet } func MakePodSet(name kueue.PodSetReference, count int) *PodSetWrapper { @@ -315,6 +320,16 @@ func (s *SliceWrapper) Error() *SliceWrapper { return s } +func (s *SliceWrapper) Finalizers(fin ...string) *SliceWrapper { + s.ObjectMeta.Finalizers = fin + return s +} + +func (s *SliceWrapper) DeletionTimestamp(t time.Time) *SliceWrapper { + s.Slice.DeletionTimestamp = ptr.To(metav1.NewTime(t).Rfc3339Copy()) + return s +} + func AppendOwnerReference(obj client.Object, gvk schema.GroupVersionKind, name, uid string, controller, blockDeletion *bool) { obj.SetOwnerReferences(append(obj.GetOwnerReferences(), metav1.OwnerReference{ APIVersion: gvk.GroupVersion().String(), diff --git a/slice/internal/util/testingjobs/jobset/wrappers.go b/slice/internal/util/testingjobs/jobset/wrappers.go index 3558ddb13..dfa11bf5b 100644 --- a/slice/internal/util/testingjobs/jobset/wrappers.go +++ b/slice/internal/util/testingjobs/jobset/wrappers.go @@ -19,6 +19,7 @@ package jobset import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2" jobsetutil "sigs.k8s.io/jobset/pkg/util/testing" @@ -40,16 +41,18 @@ var TestPodSpec = corev1.PodSpec{ } type ReplicatedJobRequirements struct { - Name string - Replicas int32 - Parallelism int32 - Completions int32 - Labels map[string]string - Annotations map[string]string - PodAnnotations map[string]string - NodeSelector map[string]string - Image string - Args []string + Name string + Replicas int32 + Parallelism int32 + Completions int32 + Labels map[string]string + Annotations map[string]string + PodAnnotations map[string]string + NodeSelector map[string]string + Image string + Args []string + TerminationGracePeriodSeconds int64 + LifecyclePreStopSleepSeconds int64 } // MakeJobSet creates a wrapper for a suspended JobSet @@ -63,6 +66,11 @@ func (j *JobSetWrapper) Obj() *jobsetapi.JobSet { return &j.JobSet } +func (j *JobSetWrapper) UID(uid string) *JobSetWrapper { + j.ObjectMeta.UID = types.UID(uid) + return j +} + // ReplicatedJobs sets a new set of ReplicatedJobs in the inner JobSet. func (j *JobSetWrapper) ReplicatedJobs(replicatedJobs ...ReplicatedJobRequirements) *JobSetWrapper { j.Spec.ReplicatedJobs = make([]jobsetapi.ReplicatedJob, len(replicatedJobs)) @@ -78,12 +86,19 @@ func (j *JobSetWrapper) ReplicatedJobs(replicatedJobs ...ReplicatedJobRequiremen jt.Spec.BackoffLimit = ptr.To[int32](0) spec := &jt.Spec.Template.Spec spec.RestartPolicy = corev1.RestartPolicyNever - spec.TerminationGracePeriodSeconds = ptr.To[int64](0) + spec.TerminationGracePeriodSeconds = ptr.To[int64](req.TerminationGracePeriodSeconds) spec.Containers = []corev1.Container{ { Name: "c", Image: req.Image, Args: req.Args, + Lifecycle: &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + Sleep: &corev1.SleepAction{ + Seconds: req.LifecyclePreStopSleepSeconds, + }, + }, + }, }, } } diff --git a/slice/internal/util/testingjobs/pod/wrappers.go b/slice/internal/util/testingjobs/pod/wrappers.go new file mode 100644 index 000000000..6ee41c10c --- /dev/null +++ b/slice/internal/util/testingjobs/pod/wrappers.go @@ -0,0 +1,90 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + "tpu-slice-controller/internal/util/testing" +) + +// PodWrapper wraps a Pod. +type PodWrapper struct { + corev1.Pod +} + +// MakePod creates a wrapper for a pod with a single container. +func MakePod(name, ns string) *PodWrapper { + return &PodWrapper{corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: make(map[string]string, 1), + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}, Limits: corev1.ResourceList{}}, + }, + }, + SchedulingGates: make([]corev1.PodSchedulingGate, 0), + }, + }} +} + +// Obj returns the inner Pod. +func (p *PodWrapper) Obj() *corev1.Pod { + return &p.Pod +} + +// Clone returns deep copy of the Pod. +func (p *PodWrapper) Clone() *PodWrapper { + return &PodWrapper{Pod: *p.DeepCopy()} +} + +// OwnerReference adds a ownerReference to the default container. +func (p *PodWrapper) OwnerReference(ownerName string, ownerGVK schema.GroupVersionKind) *PodWrapper { + testing.AppendOwnerReference(&p.Pod, ownerGVK, ownerName, ownerName, ptr.To(true), ptr.To(true)) + return p +} + +// Label sets the label of the Pod +func (p *PodWrapper) Label(k, v string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[k] = v + return p +} + +// StatusPhase updates status phase of the Pod. +func (p *PodWrapper) StatusPhase(ph corev1.PodPhase) *PodWrapper { + p.Status.Phase = ph + return p +} + +// Name updated the name of the pod +func (p *PodWrapper) Name(n string) *PodWrapper { + p.ObjectMeta.Name = n + return p +} diff --git a/slice/test/e2e/jobset_test.go b/slice/test/e2e/jobset_test.go index 1b84e689a..312cd1246 100644 --- a/slice/test/e2e/jobset_test.go +++ b/slice/test/e2e/jobset_test.go @@ -33,6 +33,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" jobsetcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/test/util" slice "tpu-slice-controller/api/v1alpha1" "tpu-slice-controller/internal/controller" @@ -279,6 +280,10 @@ var _ = ginkgo.Describe("JobSet", func() { ginkgo.By("Checking that Slice is deleted", func() { utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdSlice, false) }) + + ginkgo.By("Checking that Workload is deleted", func() { + utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdWorkload, false) + }) }, ginkgo.Entry("TPU topology 4x4x4 and parallelism 16", testCase{ tpuTopology: "4x4x4", @@ -417,5 +422,111 @@ var _ = ginkgo.Describe("JobSet", func() { }, }), ) + + ginkgo.It("should delete the Workload finalizer after all Pods have gracefully terminated", func() { + jobSet := testingjobsjobset.MakeJobSet("jobset", ns.Name). + Queue(lq.Name). + ReplicatedJobs( + testingjobsjobset.ReplicatedJobRequirements{ + Name: "rj1", + Image: utils.E2eTestAgnHostImage, + Args: utils.BehaviorWaitForDeletion, + Replicas: 1, + Parallelism: 1, + Completions: 1, + PodAnnotations: map[string]string{ + core.TPUTopologyAnnotation: "4x4x4", + }, + NodeSelector: map[string]string{ + core.TPUAcceleratorLabel: tpuAccelerator, + }, + TerminationGracePeriodSeconds: 60, + LifecyclePreStopSleepSeconds: 60, + }, + ). + RequestAndLimit("rj1", extraResource, "1"). + Obj() + + ginkgo.By("Creating a JobSet", func() { + utils.MustCreate(ctx, k8sClient, jobSet) + }) + + createdWorkload := &kueue.Workload{} + wlKey := types.NamespacedName{ + Name: jobsetcontroller.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), + Namespace: ns.Name, + } + + createdSlice := &slice.Slice{} + sliceKey := types.NamespacedName{ + Name: wlKey.Name, + Namespace: wlKey.Namespace, + } + + ginkgo.By("Checking that Slice is created", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, sliceKey, createdSlice)).To(gomega.Succeed()) + }, utils.Timeout, utils.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Adding Ready condition", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, sliceKey, createdSlice)).To(gomega.Succeed()) + meta.SetStatusCondition(&createdSlice.Status.Conditions, metav1.Condition{ + Type: string(slice.Forming), + Status: metav1.ConditionFalse, + Reason: "Test", + Message: "Test", + }) + meta.SetStatusCondition(&createdSlice.Status.Conditions, metav1.Condition{ + Type: string(slice.Ready), + Status: metav1.ConditionTrue, + Reason: "Test", + Message: "Test", + }) + g.Expect(k8sClient.Status().Update(ctx, createdSlice)).To(gomega.Succeed()) + }, utils.Timeout, utils.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking that the Workload is admitted", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed()) + g.Expect(workload.IsAdmitted(createdWorkload)).Should(gomega.BeTrue()) + }, utils.LongTimeout, utils.Timeout).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking that all pods are running", func() { + pods := &corev1.PodList{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed()) + g.Expect(pods.Items).Should(gomega.HaveLen(1)) + for _, pod := range pods.Items { + g.Expect(pod.Status.Phase).To(gomega.Equal(corev1.PodRunning)) + } + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Deactivating the Workload", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed()) + createdWorkload.Spec.Active = ptr.To(false) + g.Expect(k8sClient.Update(ctx, createdWorkload)).Should(gomega.Succeed()) + }, utils.Timeout, utils.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking that Slice is still exists and waiting for Pods termination", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, sliceKey, createdSlice)).To(gomega.Succeed()) + }, utils.ConsistentDuration, utils.ShortInterval).Should(gomega.Succeed()) + }) + + ginkgo.By("Deleting the Pods", func() { + utils.DeleteAllPodsInNamespace(ctx, k8sClient, ns) + }) + + ginkgo.By("Checking that the Slice is deleted", func() { + utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdSlice, false) + }) + }) }) }) diff --git a/slice/test/utils/utils.go b/slice/test/utils/utils.go index 70ee6926e..59c6247ea 100644 --- a/slice/test/utils/utils.go +++ b/slice/test/utils/utils.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "k8s.io/utils/ptr" "os" "os/exec" "strconv" @@ -46,6 +47,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" podconstants "sigs.k8s.io/kueue/pkg/controller/jobs/pod/constants" + "tpu-slice-controller/internal/controller" "tpu-slice-controller/internal/util/testing" ) @@ -153,6 +155,10 @@ func DeleteAllJobsInNamespace(ctx context.Context, c client.Client, ns *corev1.N return deleteAllObjectsInNamespace(ctx, c, ns, &batchv1.Job{}) } +func DeleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) { + gomega.ExpectWithOffset(1, deleteAllPodsInNamespace(ctx, c, ns, 2)).To(gomega.Succeed()) +} + func deleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace, offset int) error { if err := client.IgnoreNotFound(c.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(ns.Name))); err != nil { return fmt.Errorf("deleting all Pods in namespace %q: %w", ns.Name, err) @@ -165,6 +171,11 @@ func deleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.N if controllerutil.RemoveFinalizer(&p, podconstants.PodFinalizer) { g.Expect(client.IgnoreNotFound(c.Update(ctx, &p))).Should(gomega.Succeed(), "removing finalizer") } + opts := &client.DeleteOptions{ + GracePeriodSeconds: ptr.To[int64](0), + } + err := c.Delete(ctx, &p, opts) + g.Expect(client.IgnoreNotFound(err)).To(gomega.Succeed()) } }, LongTimeout, Interval).Should(gomega.Succeed()) return nil @@ -178,7 +189,14 @@ func deleteWorkloadsInNamespace(ctx context.Context, c client.Client, ns *corev1 workloads := kueue.WorkloadList{} g.Expect(c.List(ctx, &workloads, client.InNamespace(ns.Name))).Should(gomega.Succeed()) for _, wl := range workloads.Items { + update := false if controllerutil.RemoveFinalizer(&wl, kueue.ResourceInUseFinalizerName) { + update = true + } + if controllerutil.RemoveFinalizer(&wl, controller.SliceControllerName) { + update = true + } + if update { g.Expect(client.IgnoreNotFound(c.Update(ctx, &wl))).Should(gomega.Succeed()) } } From 48c85aa6b305b7661198ece8f4136885526c8aad Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Thu, 31 Jul 2025 14:22:56 +0300 Subject: [PATCH 2/3] Revert Predicate implementations. --- .../controller/workload_controller.go | 60 +++----------- .../controller/workload_controller_test.go | 81 ------------------- 2 files changed, 12 insertions(+), 129 deletions(-) diff --git a/slice/internal/controller/workload_controller.go b/slice/internal/controller/workload_controller.go index 75e137d03..bc02bdd55 100644 --- a/slice/internal/controller/workload_controller.go +++ b/slice/internal/controller/workload_controller.go @@ -38,7 +38,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -106,16 +105,19 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.V(3).Info("Reconcile Workload") if finalize, reason := shouldFinalize(wl); finalize { - log.V(3).Info(fmt.Sprintf("Cleaning up the Slice and finalize the Workload because %s", reason)) - cleanedUp, err := r.cleanupSlice(ctx, wl) - if err != nil { - return ctrl.Result{}, err - } - if !cleanedUp { - return ctrl.Result{RequeueAfter: cleanupRetryAfter}, err + if controllerutil.ContainsFinalizer(wl, SliceControllerName) { + log.V(3).Info(fmt.Sprintf("Cleaning up the Slice and finalize the Workload because %s", reason)) + cleanedUp, err := r.cleanupSlice(ctx, wl) + if err != nil { + return ctrl.Result{}, err + } + if !cleanedUp { + return ctrl.Result{RequeueAfter: cleanupRetryAfter}, nil + } + err = r.finalizeWorkload(ctx, wl) + return ctrl.Result{}, client.IgnoreNotFound(err) } - err = r.finalizeWorkload(ctx, wl) - return ctrl.Result{}, client.IgnoreNotFound(err) + return ctrl.Result{}, nil } if err = validateRelevantWorkload(wl); err != nil { @@ -448,48 +450,10 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). Named("workload_controller"). - WithEventFilter(r). Watches(&v1alpha1.Slice{}, &sliceHandler{client: r.client}). Complete(r) } -var _ predicate.Predicate = (*WorkloadReconciler)(nil) - -func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { - return r.handleEvent(e.Object) -} - -func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool { - return r.handleEvent(e.Object) -} - -func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { - return r.handleEvent(e.ObjectNew) -} - -func (r *WorkloadReconciler) Generic(event.GenericEvent) bool { - // Nothing handle for Generic event. - return false -} - -func shouldHandleWorkload(wl *kueue.Workload) bool { - // We should handle all Workloads that have the cleanup slice finalizer. - if controllerutil.ContainsFinalizer(wl, SliceControllerName) { - return true - } - finalize, _ := shouldFinalize(wl) - // If the Workload doesn’t have a finalizer, we can handle only relevant workloads. - return !finalize && validateRelevantWorkload(wl) == nil -} - -func (r *WorkloadReconciler) handleEvent(obj client.Object) bool { - wl, isWorkload := obj.(*kueue.Workload) - if !isWorkload { - return true - } - return shouldHandleWorkload(wl) -} - var _ handler.EventHandler = (*sliceHandler)(nil) type sliceHandler struct { diff --git a/slice/internal/controller/workload_controller_test.go b/slice/internal/controller/workload_controller_test.go index cd4f4ba41..633057eb9 100644 --- a/slice/internal/controller/workload_controller_test.go +++ b/slice/internal/controller/workload_controller_test.go @@ -780,87 +780,6 @@ func TestWorkloadReconciler(t *testing.T) { } } -func TestWorkloadReconcilerHandleEvent(t *testing.T) { - now := time.Now().Truncate(time.Second) - - baseWorkloadWithWithAdmission := utiltesting.MakeWorkload("wl", corev1.NamespaceDefault). - PodSets( - *utiltesting.MakePodSet("ps", 2). - Annotation(core.TPUTopologyAnnotation, "4x4x12"). - NodeSelector(core.TPUAcceleratorLabel, "tpu-v7x"). - Obj(), - ). - ReserveQuota( - &kueue.Admission{ - PodSetAssignments: []kueue.PodSetAssignment{ - utiltesting.MakePodSetAssignment("ps1"). - TopologyAssignment([]string{core.TPUBlockLabel, core.TPUSubBlockLabel}, []kueue.TopologyDomainAssignment{ - { - Values: []string{"domain1", "domain2"}, - Count: 2, - }, - }).Obj(), - }, - }, now, - ) - baseWorkloadWithAdmissionOwnerReference := baseWorkloadWithWithAdmission.Clone(). - ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset", "jobset") - - cases := map[string]struct { - obj client.Object - want bool - }{ - "invalid object": { - obj: utiltestingjobspod.MakePod("pod", corev1.NamespaceDefault).Obj(), - want: true, - }, - "has cleanup slice finalizer": { - obj: utiltesting.MakeWorkload("wl", corev1.NamespaceDefault). - Finalizers(SliceControllerName). - Obj(), - want: true, - }, - "valid workload": { - obj: baseWorkloadWithAdmissionOwnerReference.DeepCopy(), - want: true, - }, - "doesn't have owner reference": { - obj: baseWorkloadWithWithAdmission.DeepCopy(), - want: false, - }, - "has unsupported owner reference": { - obj: baseWorkloadWithWithAdmission.Clone(). - ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "job"). - Obj(), - want: false, - }, - "has DeletionTimestamp": { - obj: baseWorkloadWithAdmissionOwnerReference.Clone().DeletionTimestamp(now).Obj(), - want: false, - }, - "finished": { - obj: baseWorkloadWithAdmissionOwnerReference.Clone().Finished().Obj(), - want: false, - }, - "evicted": { - obj: baseWorkloadWithAdmissionOwnerReference.Clone().Active(false).Obj(), - want: false, - }, - "deactivated": { - obj: baseWorkloadWithAdmissionOwnerReference.Clone().Evicted().Obj(), - want: false, - }, - } - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - got := NewWorkloadReconciler(nil, nil).handleEvent(tc.obj) - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Errorf("Result after Update (-want,+got):\n%s", diff) - } - }) - } -} - func TestSliceHandlerHandleEvent(t *testing.T) { const ( baseWlName = "wl" From 664ce92fc3d6495726d14f7840843ac3a20299da Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Thu, 31 Jul 2025 14:30:55 +0300 Subject: [PATCH 3/3] Fix lint. --- slice/test/utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slice/test/utils/utils.go b/slice/test/utils/utils.go index 59c6247ea..5ab81f07f 100644 --- a/slice/test/utils/utils.go +++ b/slice/test/utils/utils.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io" - "k8s.io/utils/ptr" "os" "os/exec" "strconv" @@ -39,6 +38,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"