Skip to content

Commit dd1f55e

Browse files
committed
Refactor syncAdmissionCheckStatus().
1 parent bad2360 commit dd1f55e

File tree

6 files changed

+155
-86
lines changed

6 files changed

+155
-86
lines changed

slice/internal/controller/workload_controller.go

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -446,22 +446,23 @@ func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload,
446446
}
447447

448448
if changed {
449-
msg := fmt.Sprintf("The Slices %v have been created", joinSliceNames(slices))
449+
msg := buildACStatusMessageForCreation(slices)
450450
ctrl.LoggerFrom(ctx).V(3).Info(msg)
451451
r.record.Event(wl, corev1.EventTypeNormal, SlicesCreatedEventType, msg)
452+
ac.State = kueue.CheckStatePending
452453
ac.Message = msg
453454
return true, r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
454455
}
455456

456457
return false, nil
457458
}
458459

459-
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, psa *kueue.PodSetAssignment) {
460+
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, topologyAssignment *kueue.TopologyAssignment) {
460461
nodeSelectors := sets.New[string]()
461462
// we already validated that all assignments have a valid level,
462463
// in validateRelevantWorkload.
463-
subblockLevelIndex := topology.SubblockLevelIndex(psa)
464-
for _, domain := range psa.TopologyAssignment.Domains {
464+
subblockLevelIndex := topology.SubblockLevelIndex(topologyAssignment)
465+
for _, domain := range topologyAssignment.Domains {
465466
nodeSelectors.Insert(domain.Values[subblockLevelIndex])
466467
}
467468
slice.Spec.NodeSelector = map[string][]string{
@@ -477,7 +478,7 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
477478
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
478479
return nil, err
479480
}
480-
parseTopologyAssignmentIntoNodeSelector(slice, psa)
481+
parseTopologyAssignmentIntoNodeSelector(slice, psa.TopologyAssignment)
481482

482483
if err := r.client.Create(ctx, slice); err != nil {
483484
msg := fmt.Sprintf("Error creating Slice %q: %v", client.ObjectKeyFromObject(slice), err)
@@ -502,59 +503,98 @@ func (r *WorkloadReconciler) updateWorkloadAdmissionCheckStatus(ctx context.Cont
502503
return err
503504
}
504505

505-
func joinSliceNames(slices []v1alpha1.Slice) string {
506+
func buildACStatusMessageForCreation(slices []v1alpha1.Slice) string {
506507
sliceNames := make([]string, len(slices))
507508
for index, slice := range slices {
508509
sliceNames[index] = fmt.Sprintf("%q", client.ObjectKeyFromObject(&slice))
509510
}
510511
sort.Strings(sliceNames)
511-
return strings.Join(sliceNames, ", ")
512+
return fmt.Sprintf("The Slices %s have been created", strings.Join(sliceNames, ", "))
512513
}
513514

514515
// syncAdmissionCheckStatus syncs the admission check status with the state of the Slices.
515516
func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) error {
516517
originalState := ac.State
517518
originalMessage := ac.Message
518519

519-
slicesByStatus := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
520-
for _, slice := range slices {
521-
for _, status := range core.SliceStatuses {
522-
if meta.IsStatusConditionTrue(slice.Status.Conditions, string(status)) {
523-
slicesByStatus[status] = append(slicesByStatus[status], slice)
524-
}
525-
}
526-
}
520+
slicesByStatus := groupSlicesByStatus(slices)
527521

528522
switch {
529-
case len(slicesByStatus[v1alpha1.Error]) > 0:
530-
ac.State = kueue.CheckStateRejected
531-
ac.Message = fmt.Sprintf("The Slices %s are not operational due to an errors", joinSliceNames(slicesByStatus[v1alpha1.Error]))
532-
case len(slicesByStatus[v1alpha1.Deformed]) > 0:
523+
case len(slicesByStatus[v1alpha1.Error]) > 0 || len(slicesByStatus[v1alpha1.Deformed]) > 0:
533524
ac.State = kueue.CheckStateRejected
534-
ac.Message = fmt.Sprintf("The Slices %s are being torn down", joinSliceNames(slicesByStatus[v1alpha1.Deformed]))
535-
case len(slicesByStatus[v1alpha1.Forming]) > 0:
536-
ac.State = kueue.CheckStatePending
537-
ac.Message = fmt.Sprintf("The Slices %s are being formed", joinSliceNames(slicesByStatus[v1alpha1.Forming]))
538-
case len(slicesByStatus[v1alpha1.Degraded]) > 0:
539-
ac.State = kueue.CheckStateReady
540-
ac.Message = fmt.Sprintf("The Slices %s are running with reduced capacity or performance", joinSliceNames(slicesByStatus[v1alpha1.Degraded]))
541-
case len(slicesByStatus[v1alpha1.Ready]) > 0:
525+
case len(slices) == len(slicesByStatus[v1alpha1.Degraded])+len(slicesByStatus[v1alpha1.Ready]):
542526
ac.State = kueue.CheckStateReady
543-
ac.Message = fmt.Sprintf("The Slices %s are fully operational", joinSliceNames(slicesByStatus[v1alpha1.Ready]))
527+
default:
528+
ac.State = kueue.CheckStatePending
544529
}
545530

531+
ac.Message = buildACStatusMessageForSync(slicesByStatus)
532+
546533
// No changes.
547534
if originalState == ac.State && ac.Message == originalMessage {
548535
return nil
549536
}
550537

551-
err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
552-
if err == nil && originalState != ac.State {
538+
if err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac); err != nil {
539+
return err
540+
}
541+
542+
log := ctrl.LoggerFrom(ctx)
543+
544+
if originalState != ac.State {
553545
message := fmt.Sprintf("Admission check %q updated state from %q to %q", ac.Name, originalState, ac.State)
546+
log.V(3).Info(message)
554547
r.record.Event(wl, corev1.EventTypeNormal, AdmissionCheckUpdatedEventType, message)
555548
}
556549

557-
return err
550+
if ac.Message != originalMessage {
551+
// Logging error messages if exists
552+
for _, slice := range slicesByStatus[v1alpha1.Error] {
553+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
554+
log.V(2).Info(
555+
"WARNING: The Slice are not operational due to an error",
556+
"slice", klog.KObj(&slice),
557+
"error", cond.Message,
558+
)
559+
}
560+
}
561+
562+
return nil
563+
}
564+
565+
func groupSlicesByStatus(slices []v1alpha1.Slice) map[v1alpha1.SliceConditionType][]v1alpha1.Slice {
566+
slicesByStatus := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
567+
for _, slice := range slices {
568+
for _, status := range []v1alpha1.SliceConditionType{
569+
v1alpha1.Error, v1alpha1.Deformed, v1alpha1.Forming, v1alpha1.Degraded, v1alpha1.Ready,
570+
} {
571+
if meta.IsStatusConditionTrue(slice.Status.Conditions, string(status)) {
572+
slicesByStatus[status] = append(slicesByStatus[status], slice)
573+
break
574+
}
575+
}
576+
}
577+
return slicesByStatus
578+
}
579+
580+
func buildACStatusMessageForSync(slices map[v1alpha1.SliceConditionType][]v1alpha1.Slice) string {
581+
msg := fmt.Sprintf(
582+
"Slices are in states: %d Forming, %d Degraded, %d Ready, %d Error, %d Deformed",
583+
len(slices[v1alpha1.Forming]),
584+
len(slices[v1alpha1.Degraded]),
585+
len(slices[v1alpha1.Ready]),
586+
len(slices[v1alpha1.Error]),
587+
len(slices[v1alpha1.Deformed]),
588+
)
589+
if len(slices[v1alpha1.Error]) > 0 {
590+
var errMessages []string
591+
for _, slice := range slices[v1alpha1.Error] {
592+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
593+
errMessages = append(errMessages, cond.Message)
594+
}
595+
msg += ". Errors: " + strings.Join(errMessages, "; ")
596+
}
597+
return api.TruncateConditionMessage(msg)
558598
}
559599

560600
// SetupWithManager sets up the controller with the Manager.

slice/internal/controller/workload_controller_test.go

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,30 @@ func TestWorkloadReconciler(t *testing.T) {
649649
},
650650
},
651651
},
652-
"should update the Workload AdmissionCheckState when Slices status is changed to Forming": {
652+
"should update the Workload's AdmissionCheckState when one Slice is in the Forming state": {
653+
request: baseRequest,
654+
objs: []client.Object{
655+
baseAdmissionCheckWrapper.DeepCopy(),
656+
baseWorkloadWrapperWithFinalizer.DeepCopy(),
657+
baseSlice1Wrapper.Clone().Forming().Obj(),
658+
baseSlice2Wrapper.Clone().Obj(),
659+
},
660+
wantWorkloads: []kueue.Workload{
661+
*baseWorkloadWrapperWithFinalizer.Clone().
662+
AdmissionCheck(kueue.AdmissionCheckState{
663+
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
664+
State: kueue.CheckStatePending,
665+
LastTransitionTime: metav1.NewTime(now),
666+
Message: `Slices are in states: 1 Forming, 0 Degraded, 0 Ready, 0 Error, 0 Deformed`,
667+
}).
668+
Obj(),
669+
},
670+
wantSlices: []slice.Slice{
671+
*baseSlice1Wrapper.Clone().Forming().Obj(),
672+
*baseSlice2Wrapper.Clone().Obj(),
673+
},
674+
},
675+
"should update the Workload's AdmissionCheckState when all Slices are in the Forming state": {
653676
request: baseRequest,
654677
objs: []client.Object{
655678
baseAdmissionCheckWrapper.DeepCopy(),
@@ -663,7 +686,7 @@ func TestWorkloadReconciler(t *testing.T) {
663686
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
664687
State: kueue.CheckStatePending,
665688
LastTransitionTime: metav1.NewTime(now),
666-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are being formed`,
689+
Message: `Slices are in states: 2 Forming, 0 Degraded, 0 Ready, 0 Error, 0 Deformed`,
667690
}).
668691
Obj(),
669692
},
@@ -672,7 +695,30 @@ func TestWorkloadReconciler(t *testing.T) {
672695
*baseSlice2Wrapper.Clone().Forming().Obj(),
673696
},
674697
},
675-
"should update the Workload AdmissionCheckState when the Slice status is changed to Ready": {
698+
"should update the Workload's AdmissionCheckState when one Slice is in the Ready state": {
699+
request: baseRequest,
700+
objs: []client.Object{
701+
baseAdmissionCheckWrapper.DeepCopy(),
702+
baseWorkloadWrapperWithFinalizer.DeepCopy(),
703+
baseSlice1Wrapper.Clone().Ready().Obj(),
704+
baseSlice2Wrapper.Clone().Forming().Obj(),
705+
},
706+
wantWorkloads: []kueue.Workload{
707+
*baseWorkloadWrapperWithFinalizer.Clone().
708+
AdmissionCheck(kueue.AdmissionCheckState{
709+
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
710+
State: kueue.CheckStatePending,
711+
LastTransitionTime: metav1.NewTime(now),
712+
Message: `Slices are in states: 1 Forming, 0 Degraded, 1 Ready, 0 Error, 0 Deformed`,
713+
}).
714+
Obj(),
715+
},
716+
wantSlices: []slice.Slice{
717+
*baseSlice1Wrapper.Clone().Ready().Obj(),
718+
*baseSlice2Wrapper.Clone().Forming().Obj(),
719+
},
720+
},
721+
"should update the Workload's AdmissionCheckState when all Slices are in the Ready state": {
676722
request: baseRequest,
677723
objs: []client.Object{
678724
baseAdmissionCheckWrapper.DeepCopy(),
@@ -686,7 +732,7 @@ func TestWorkloadReconciler(t *testing.T) {
686732
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
687733
State: kueue.CheckStateReady,
688734
LastTransitionTime: metav1.NewTime(now),
689-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are fully operational`,
735+
Message: `Slices are in states: 0 Forming, 0 Degraded, 2 Ready, 0 Error, 0 Deformed`,
690736
}).
691737
Obj(),
692738
},
@@ -703,12 +749,12 @@ func TestWorkloadReconciler(t *testing.T) {
703749
},
704750
},
705751
},
706-
"should update the Workload AdmissionCheckState when the Slice status is changed to Degraded": {
752+
"should update the Workload's AdmissionCheckState when one Slice is in the Ready state and another is in the Degraded state": {
707753
request: baseRequest,
708754
objs: []client.Object{
709755
baseAdmissionCheckWrapper.DeepCopy(),
710756
baseWorkloadWrapperWithFinalizer.DeepCopy(),
711-
baseSlice1Wrapper.Clone().Degraded().Obj(),
757+
baseSlice1Wrapper.Clone().Ready().Obj(),
712758
baseSlice2Wrapper.Clone().Degraded().Obj(),
713759
},
714760
wantWorkloads: []kueue.Workload{
@@ -717,12 +763,12 @@ func TestWorkloadReconciler(t *testing.T) {
717763
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
718764
State: kueue.CheckStateReady,
719765
LastTransitionTime: metav1.NewTime(now),
720-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are running with reduced capacity or performance`,
766+
Message: `Slices are in states: 0 Forming, 1 Degraded, 1 Ready, 0 Error, 0 Deformed`,
721767
}).
722768
Obj(),
723769
},
724770
wantSlices: []slice.Slice{
725-
*baseSlice1Wrapper.Clone().Degraded().Obj(),
771+
*baseSlice1Wrapper.Clone().Ready().Obj(),
726772
*baseSlice2Wrapper.Clone().Degraded().Obj()},
727773
wantEvents: []utiltesting.EventRecord{
728774
{
@@ -733,27 +779,27 @@ func TestWorkloadReconciler(t *testing.T) {
733779
},
734780
},
735781
},
736-
"should update the Workload AdmissionCheckState when the Slice status is changed to Deformed": {
782+
"should update the Workload's AdmissionCheckState when one Slice is in the Error state": {
737783
request: baseRequest,
738784
objs: []client.Object{
739785
baseAdmissionCheckWrapper.DeepCopy(),
740786
baseWorkloadWrapperWithFinalizer.DeepCopy(),
741-
baseSlice1Wrapper.Clone().Deformed().Obj(),
742-
baseSlice2Wrapper.Clone().Deformed().Obj(),
787+
baseSlice1Wrapper.Clone().Ready().Obj(),
788+
baseSlice2Wrapper.Clone().Error().Obj(),
743789
},
744790
wantWorkloads: []kueue.Workload{
745791
*baseWorkloadWrapperWithFinalizer.Clone().
746792
AdmissionCheck(kueue.AdmissionCheckState{
747793
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
748794
State: kueue.CheckStateRejected,
749795
LastTransitionTime: metav1.NewTime(now),
750-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are being torn down`,
796+
Message: `Slices are in states: 0 Forming, 0 Degraded, 1 Ready, 1 Error, 0 Deformed. Errors: Error by test`,
751797
}).
752798
Obj(),
753799
},
754800
wantSlices: []slice.Slice{
755-
*baseSlice1Wrapper.Clone().Deformed().Obj(),
756-
*baseSlice2Wrapper.Clone().Deformed().Obj()},
801+
*baseSlice1Wrapper.Clone().Ready().Obj(),
802+
*baseSlice2Wrapper.Clone().Error().Obj()},
757803
wantEvents: []utiltesting.EventRecord{
758804
{
759805
Key: client.ObjectKeyFromObject(baseWorkloadWrapper),
@@ -763,28 +809,27 @@ func TestWorkloadReconciler(t *testing.T) {
763809
},
764810
},
765811
},
766-
"should update the Workload AdmissionCheckState when the Slice status is changed to Error": {
812+
"should update the Workload's AdmissionCheckState when one Slice is in the Deformed state": {
767813
request: baseRequest,
768814
objs: []client.Object{
769815
baseAdmissionCheckWrapper.DeepCopy(),
770816
baseWorkloadWrapperWithFinalizer.DeepCopy(),
771-
baseSlice1Wrapper.Clone().Error().Obj(),
772-
baseSlice2Wrapper.Clone().Error().Obj(),
817+
baseSlice1Wrapper.Clone().Ready().Obj(),
818+
baseSlice2Wrapper.Clone().Deformed().Obj(),
773819
},
774820
wantWorkloads: []kueue.Workload{
775821
*baseWorkloadWrapperWithFinalizer.Clone().
776822
AdmissionCheck(kueue.AdmissionCheckState{
777823
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
778824
State: kueue.CheckStateRejected,
779825
LastTransitionTime: metav1.NewTime(now),
780-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are not operational due to an errors`,
826+
Message: `Slices are in states: 0 Forming, 0 Degraded, 1 Ready, 0 Error, 1 Deformed`,
781827
}).
782828
Obj(),
783829
},
784830
wantSlices: []slice.Slice{
785-
*baseSlice1Wrapper.Clone().Error().Obj(),
786-
*baseSlice2Wrapper.Clone().Error().Obj(),
787-
},
831+
*baseSlice1Wrapper.Clone().Ready().Obj(),
832+
*baseSlice2Wrapper.Clone().Deformed().Obj()},
788833
wantEvents: []utiltesting.EventRecord{
789834
{
790835
Key: client.ObjectKeyFromObject(baseWorkloadWrapper),
@@ -809,7 +854,7 @@ func TestWorkloadReconciler(t *testing.T) {
809854
Name: kueue.AdmissionCheckReference(baseAdmissionCheckName),
810855
State: kueue.CheckStateReady,
811856
LastTransitionTime: metav1.NewTime(now),
812-
Message: `The Slices "default/workload-ps1", "default/workload-ps2" are fully operational`,
857+
Message: `Slices are in states: 0 Forming, 0 Degraded, 2 Ready, 0 Error, 0 Deformed`,
813858
}).
814859
Obj(),
815860
},

slice/internal/core/slice.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ import (
2525
"tpu-slice-controller/api/v1alpha1"
2626
)
2727

28-
var SliceStatuses = []v1alpha1.SliceConditionType{
29-
v1alpha1.Error, v1alpha1.Deformed, v1alpha1.Forming, v1alpha1.Degraded, v1alpha1.Ready,
30-
}
31-
3228
func SliceKeyFromWorkload(wl *kueue.Workload, podSetName kueue.PodSetReference) client.ObjectKey {
3329
slice := SliceWithMetadata(wl, podSetName)
3430
return client.ObjectKeyFromObject(slice)

slice/internal/topology/topology.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222

2323
// SubblockLevelIndex returns the index of the TPUSubBlock topology
2424
// assignment, or -1 if it doesn't exist.
25-
func SubblockLevelIndex(psa *kueue.PodSetAssignment) int {
26-
for i, level := range psa.TopologyAssignment.Levels {
25+
func SubblockLevelIndex(topologyAssignment *kueue.TopologyAssignment) int {
26+
for i, level := range topologyAssignment.Levels {
2727
if level == core.TPUSubBlockLabel {
2828
return i
2929
}

slice/internal/topology/validation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func AllAssignmentsValid(admission *kueue.Admission) bool {
3434
if psa.TopologyAssignment == nil {
3535
continue
3636
}
37-
if SubblockLevelIndex(&psa) == -1 {
37+
if SubblockLevelIndex(psa.TopologyAssignment) == -1 {
3838
return false
3939
}
4040
}

0 commit comments

Comments
 (0)