Skip to content

Commit c2bc1ba

Browse files
committed
Refactor syncAdmissionCheckStatus().
1 parent bad2360 commit c2bc1ba

File tree

7 files changed

+197
-102
lines changed

7 files changed

+197
-102
lines changed

slice/internal/controller/workload_controller.go

Lines changed: 81 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
164164
return ctrl.Result{}, err
165165
}
166166

167-
changed, err := r.syncSlices(ctx, wl, ac, slices)
168-
if err != nil || changed {
167+
err = r.syncSlices(ctx, wl, ac, &slices)
168+
if err != nil {
169169
return ctrl.Result{}, err
170170
}
171171

@@ -420,13 +420,17 @@ func (r *WorkloadReconciler) sliceAC(ctx context.Context, wl *kueue.Workload) (*
420420
return workload.FindAdmissionCheck(wl.Status.AdmissionChecks, relevantChecks[0]), nil
421421
}
422422

423-
func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) (bool, error) {
424-
slicesByName := make(map[string]*v1alpha1.Slice, len(slices))
425-
for _, slice := range slices {
423+
func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices *[]v1alpha1.Slice) error {
424+
slicesByName := make(map[string]*v1alpha1.Slice, len(*slices))
425+
for _, slice := range *slices {
426426
slicesByName[slice.Name] = &slice
427427
}
428428

429-
changed := false
429+
capacity := len(wl.Status.Admission.PodSetAssignments) - len(slicesByName)
430+
if capacity < 0 {
431+
capacity = 0
432+
}
433+
createdSlices := make([]v1alpha1.Slice, 0, capacity)
430434

431435
for _, psa := range wl.Status.Admission.PodSetAssignments {
432436
sliceName := core.SliceName(wl.Name, psa.Name)
@@ -438,30 +442,28 @@ func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload,
438442

439443
createdSlice, err := r.createSlice(ctx, wl, ac, &psa)
440444
if err != nil {
441-
return false, err
445+
return err
442446
}
443447

444-
slices = append(slices, *createdSlice)
445-
changed = true
448+
*slices = append(*slices, *createdSlice)
449+
createdSlices = append(createdSlices, *createdSlice)
446450
}
447451

448-
if changed {
449-
msg := fmt.Sprintf("The Slices %v have been created", joinSliceNames(slices))
452+
if len(createdSlices) > 0 {
453+
msg := buildCreationEventMessage(*slices)
450454
ctrl.LoggerFrom(ctx).V(3).Info(msg)
451-
r.record.Event(wl, corev1.EventTypeNormal, SlicesCreatedEventType, msg)
452-
ac.Message = msg
453-
return true, r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
455+
r.record.Event(wl, corev1.EventTypeNormal, SlicesCreatedEventType, api.TruncateEventMessage(msg))
454456
}
455457

456-
return false, nil
458+
return nil
457459
}
458460

459-
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, psa *kueue.PodSetAssignment) {
461+
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, topologyAssignment *kueue.TopologyAssignment) {
460462
nodeSelectors := sets.New[string]()
461463
// we already validated that all assignments have a valid level,
462464
// in validateRelevantWorkload.
463-
subblockLevelIndex := topology.SubblockLevelIndex(psa)
464-
for _, domain := range psa.TopologyAssignment.Domains {
465+
subblockLevelIndex := topology.SubblockLevelIndex(topologyAssignment)
466+
for _, domain := range topologyAssignment.Domains {
465467
nodeSelectors.Insert(domain.Values[subblockLevelIndex])
466468
}
467469
slice.Spec.NodeSelector = map[string][]string{
@@ -477,7 +479,7 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
477479
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
478480
return nil, err
479481
}
480-
parseTopologyAssignmentIntoNodeSelector(slice, psa)
482+
parseTopologyAssignmentIntoNodeSelector(slice, psa.TopologyAssignment)
481483

482484
if err := r.client.Create(ctx, slice); err != nil {
483485
msg := fmt.Sprintf("Error creating Slice %q: %v", client.ObjectKeyFromObject(slice), err)
@@ -502,59 +504,94 @@ func (r *WorkloadReconciler) updateWorkloadAdmissionCheckStatus(ctx context.Cont
502504
return err
503505
}
504506

505-
func joinSliceNames(slices []v1alpha1.Slice) string {
507+
func buildCreationEventMessage(slices []v1alpha1.Slice) string {
506508
sliceNames := make([]string, len(slices))
507509
for index, slice := range slices {
508510
sliceNames[index] = fmt.Sprintf("%q", client.ObjectKeyFromObject(&slice))
509511
}
510512
sort.Strings(sliceNames)
511-
return strings.Join(sliceNames, ", ")
513+
return fmt.Sprintf("The Slices %s have been created", strings.Join(sliceNames, ", "))
512514
}
513515

514516
// syncAdmissionCheckStatus syncs the admission check status with the state of the Slices.
515517
func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) error {
516518
originalState := ac.State
517519
originalMessage := ac.Message
518520

521+
prepareAdmissionCheckStatus(ac, slices)
522+
523+
// No changes.
524+
if originalState == ac.State && ac.Message == originalMessage {
525+
return nil
526+
}
527+
528+
if err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac); err != nil {
529+
return err
530+
}
531+
532+
log := ctrl.LoggerFrom(ctx)
533+
534+
if originalState != ac.State {
535+
message := fmt.Sprintf("Admission check %q updated state from %q to %q", ac.Name, originalState, ac.State)
536+
log.V(3).Info(message)
537+
r.record.Event(wl, corev1.EventTypeNormal, AdmissionCheckUpdatedEventType, message)
538+
}
539+
540+
if ac.Message != originalMessage {
541+
// Logging error messages if exists
542+
for _, slice := range slices {
543+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
544+
if cond != nil {
545+
log.V(2).Info(
546+
"WARNING: The Slice are not operational due to an error",
547+
"slice", klog.KObj(&slice),
548+
"error", cond.Message,
549+
)
550+
}
551+
}
552+
}
553+
554+
return nil
555+
}
556+
557+
func groupSlicesByStatus(slices []v1alpha1.Slice) map[v1alpha1.SliceConditionType][]v1alpha1.Slice {
519558
slicesByStatus := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
520559
for _, slice := range slices {
521-
for _, status := range core.SliceStatuses {
560+
for _, status := range core.SliceStates {
522561
if meta.IsStatusConditionTrue(slice.Status.Conditions, string(status)) {
523562
slicesByStatus[status] = append(slicesByStatus[status], slice)
563+
break
524564
}
525565
}
526566
}
567+
return slicesByStatus
568+
}
569+
570+
func prepareAdmissionCheckStatus(ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) {
571+
slicesByStatus := groupSlicesByStatus(slices)
527572

528573
switch {
529-
case len(slicesByStatus[v1alpha1.Error]) > 0:
574+
case len(slicesByStatus[v1alpha1.Error]) > 0 || len(slicesByStatus[v1alpha1.Deformed]) > 0:
530575
ac.State = kueue.CheckStateRejected
531-
ac.Message = fmt.Sprintf("The Slices %s are not operational due to an errors", joinSliceNames(slicesByStatus[v1alpha1.Error]))
532-
case len(slicesByStatus[v1alpha1.Deformed]) > 0:
533-
ac.State = kueue.CheckStateRejected
534-
ac.Message = fmt.Sprintf("The Slices %s are being torn down", joinSliceNames(slicesByStatus[v1alpha1.Deformed]))
535-
case len(slicesByStatus[v1alpha1.Forming]) > 0:
536-
ac.State = kueue.CheckStatePending
537-
ac.Message = fmt.Sprintf("The Slices %s are being formed", joinSliceNames(slicesByStatus[v1alpha1.Forming]))
538-
case len(slicesByStatus[v1alpha1.Degraded]) > 0:
576+
case len(slices) == len(slicesByStatus[v1alpha1.Degraded])+len(slicesByStatus[v1alpha1.Ready]):
539577
ac.State = kueue.CheckStateReady
540-
ac.Message = fmt.Sprintf("The Slices %s are running with reduced capacity or performance", joinSliceNames(slicesByStatus[v1alpha1.Degraded]))
541-
case len(slicesByStatus[v1alpha1.Ready]) > 0:
542-
ac.State = kueue.CheckStateReady
543-
ac.Message = fmt.Sprintf("The Slices %s are fully operational", joinSliceNames(slicesByStatus[v1alpha1.Ready]))
544578
}
545579

546-
// No changes.
547-
if originalState == ac.State && ac.Message == originalMessage {
548-
return nil
580+
ac.Message = fmt.Sprintf("Slices are in states: %d Created", len(slices))
581+
for _, state := range core.SliceStates {
582+
if count := len(slicesByStatus[state]); count > 0 {
583+
ac.Message += fmt.Sprintf(", %d %s", count, state)
584+
}
549585
}
550586

551-
err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
552-
if err == nil && originalState != ac.State {
553-
message := fmt.Sprintf("Admission check %q updated state from %q to %q", ac.Name, originalState, ac.State)
554-
r.record.Event(wl, corev1.EventTypeNormal, AdmissionCheckUpdatedEventType, message)
587+
if len(slicesByStatus[v1alpha1.Error]) > 0 {
588+
var errMessages []string
589+
for _, slice := range slicesByStatus[v1alpha1.Error] {
590+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
591+
errMessages = append(errMessages, cond.Message)
592+
}
593+
ac.Message += ". Errors: " + strings.Join(errMessages, "; ")
555594
}
556-
557-
return err
558595
}
559596

560597
// SetupWithManager sets up the controller with the Manager.

0 commit comments

Comments
 (0)