Skip to content

Commit e4f97e2

Browse files
committed
Refactor syncAdmissionCheckStatus().
1 parent bad2360 commit e4f97e2

File tree

7 files changed

+211
-104
lines changed

7 files changed

+211
-104
lines changed

slice/internal/controller/workload_controller.go

Lines changed: 94 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
164164
return ctrl.Result{}, err
165165
}
166166

167-
changed, err := r.syncSlices(ctx, wl, ac, slices)
168-
if err != nil || changed {
167+
err = r.syncSlices(ctx, wl, ac, &slices)
168+
if err != nil {
169169
return ctrl.Result{}, err
170170
}
171171

@@ -420,13 +420,17 @@ func (r *WorkloadReconciler) sliceAC(ctx context.Context, wl *kueue.Workload) (*
420420
return workload.FindAdmissionCheck(wl.Status.AdmissionChecks, relevantChecks[0]), nil
421421
}
422422

423-
func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) (bool, error) {
424-
slicesByName := make(map[string]*v1alpha1.Slice, len(slices))
425-
for _, slice := range slices {
423+
func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices *[]v1alpha1.Slice) error {
424+
slicesByName := make(map[string]*v1alpha1.Slice, len(*slices))
425+
for _, slice := range *slices {
426426
slicesByName[slice.Name] = &slice
427427
}
428428

429-
changed := false
429+
capacity := len(wl.Status.Admission.PodSetAssignments) - len(slicesByName)
430+
if capacity < 0 {
431+
capacity = 0
432+
}
433+
createdSlices := make([]v1alpha1.Slice, 0, capacity)
430434

431435
for _, psa := range wl.Status.Admission.PodSetAssignments {
432436
sliceName := core.SliceName(wl.Name, psa.Name)
@@ -438,30 +442,28 @@ func (r *WorkloadReconciler) syncSlices(ctx context.Context, wl *kueue.Workload,
438442

439443
createdSlice, err := r.createSlice(ctx, wl, ac, &psa)
440444
if err != nil {
441-
return false, err
445+
return err
442446
}
443447

444-
slices = append(slices, *createdSlice)
445-
changed = true
448+
*slices = append(*slices, *createdSlice)
449+
createdSlices = append(createdSlices, *createdSlice)
446450
}
447451

448-
if changed {
449-
msg := fmt.Sprintf("The Slices %v have been created", joinSliceNames(slices))
452+
if len(createdSlices) > 0 {
453+
msg := buildCreationEventMessage(createdSlices)
450454
ctrl.LoggerFrom(ctx).V(3).Info(msg)
451-
r.record.Event(wl, corev1.EventTypeNormal, SlicesCreatedEventType, msg)
452-
ac.Message = msg
453-
return true, r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
455+
r.record.Event(wl, corev1.EventTypeNormal, SlicesCreatedEventType, api.TruncateEventMessage(msg))
454456
}
455457

456-
return false, nil
458+
return nil
457459
}
458460

459-
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, psa *kueue.PodSetAssignment) {
461+
func parseTopologyAssignmentIntoNodeSelector(slice *v1alpha1.Slice, topologyAssignment *kueue.TopologyAssignment) {
460462
nodeSelectors := sets.New[string]()
461463
// we already validated that all assignments have a valid level,
462464
// in validateRelevantWorkload.
463-
subblockLevelIndex := topology.SubblockLevelIndex(psa)
464-
for _, domain := range psa.TopologyAssignment.Domains {
465+
subblockLevelIndex := topology.SubblockLevelIndex(topologyAssignment)
466+
for _, domain := range topologyAssignment.Domains {
465467
nodeSelectors.Insert(domain.Values[subblockLevelIndex])
466468
}
467469
slice.Spec.NodeSelector = map[string][]string{
@@ -477,7 +479,7 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
477479
if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
478480
return nil, err
479481
}
480-
parseTopologyAssignmentIntoNodeSelector(slice, psa)
482+
parseTopologyAssignmentIntoNodeSelector(slice, psa.TopologyAssignment)
481483

482484
if err := r.client.Create(ctx, slice); err != nil {
483485
msg := fmt.Sprintf("Error creating Slice %q: %v", client.ObjectKeyFromObject(slice), err)
@@ -502,59 +504,106 @@ func (r *WorkloadReconciler) updateWorkloadAdmissionCheckStatus(ctx context.Cont
502504
return err
503505
}
504506

505-
func joinSliceNames(slices []v1alpha1.Slice) string {
507+
func buildCreationEventMessage(slices []v1alpha1.Slice) string {
506508
sliceNames := make([]string, len(slices))
507509
for index, slice := range slices {
508510
sliceNames[index] = fmt.Sprintf("%q", client.ObjectKeyFromObject(&slice))
509511
}
510512
sort.Strings(sliceNames)
511-
return strings.Join(sliceNames, ", ")
513+
return fmt.Sprintf("The Slices %s have been created", strings.Join(sliceNames, ", "))
512514
}
513515

514516
// syncAdmissionCheckStatus syncs the admission check status with the state of the Slices.
515517
func (r *WorkloadReconciler) syncAdmissionCheckStatus(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) error {
516518
originalState := ac.State
517519
originalMessage := ac.Message
518520

519-
slicesByStatus := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
521+
prepareAdmissionCheckStatus(ac, slices)
522+
523+
// No changes.
524+
if originalState == ac.State && ac.Message == originalMessage {
525+
return nil
526+
}
527+
528+
if err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac); err != nil {
529+
return err
530+
}
531+
532+
log := ctrl.LoggerFrom(ctx)
533+
534+
if originalState != ac.State {
535+
message := fmt.Sprintf("Admission check %q updated state from %q to %q", ac.Name, originalState, ac.State)
536+
log.V(3).Info(message)
537+
r.record.Event(wl, corev1.EventTypeNormal, AdmissionCheckUpdatedEventType, message)
538+
}
539+
540+
if ac.Message != originalMessage {
541+
// Logging error messages if exists
542+
for _, slice := range slices {
543+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
544+
if cond != nil {
545+
log.V(2).Info(
546+
"WARNING: The Slice is not operational due to an error",
547+
"slice", klog.KObj(&slice),
548+
"error", cond.Message,
549+
)
550+
}
551+
}
552+
}
553+
554+
return nil
555+
}
556+
557+
func groupSlicesByState(slices []v1alpha1.Slice) (map[v1alpha1.SliceConditionType][]v1alpha1.Slice, []v1alpha1.Slice) {
558+
slicesByState := make(map[v1alpha1.SliceConditionType][]v1alpha1.Slice)
559+
var noState []v1alpha1.Slice
520560
for _, slice := range slices {
521-
for _, status := range core.SliceStatuses {
561+
foundState := false
562+
for _, status := range core.SliceStates {
522563
if meta.IsStatusConditionTrue(slice.Status.Conditions, string(status)) {
523-
slicesByStatus[status] = append(slicesByStatus[status], slice)
564+
slicesByState[status] = append(slicesByState[status], slice)
565+
foundState = true
566+
break
524567
}
525568
}
569+
if !foundState {
570+
noState = append(noState, slice)
571+
}
526572
}
573+
return slicesByState, noState
574+
}
575+
576+
func prepareAdmissionCheckStatus(ac *kueue.AdmissionCheckState, slices []v1alpha1.Slice) {
577+
slicesByState, noState := groupSlicesByState(slices)
527578

528579
switch {
529-
case len(slicesByStatus[v1alpha1.Error]) > 0:
580+
case len(slicesByState[v1alpha1.Error]) > 0 || len(slicesByState[v1alpha1.Deformed]) > 0:
530581
ac.State = kueue.CheckStateRejected
531-
ac.Message = fmt.Sprintf("The Slices %s are not operational due to an errors", joinSliceNames(slicesByStatus[v1alpha1.Error]))
532-
case len(slicesByStatus[v1alpha1.Deformed]) > 0:
533-
ac.State = kueue.CheckStateRejected
534-
ac.Message = fmt.Sprintf("The Slices %s are being torn down", joinSliceNames(slicesByStatus[v1alpha1.Deformed]))
535-
case len(slicesByStatus[v1alpha1.Forming]) > 0:
536-
ac.State = kueue.CheckStatePending
537-
ac.Message = fmt.Sprintf("The Slices %s are being formed", joinSliceNames(slicesByStatus[v1alpha1.Forming]))
538-
case len(slicesByStatus[v1alpha1.Degraded]) > 0:
582+
case len(slices) == len(slicesByState[v1alpha1.Degraded])+len(slicesByState[v1alpha1.Ready]):
539583
ac.State = kueue.CheckStateReady
540-
ac.Message = fmt.Sprintf("The Slices %s are running with reduced capacity or performance", joinSliceNames(slicesByStatus[v1alpha1.Degraded]))
541-
case len(slicesByStatus[v1alpha1.Ready]) > 0:
542-
ac.State = kueue.CheckStateReady
543-
ac.Message = fmt.Sprintf("The Slices %s are fully operational", joinSliceNames(slicesByStatus[v1alpha1.Ready]))
544584
}
545585

546-
// No changes.
547-
if originalState == ac.State && ac.Message == originalMessage {
548-
return nil
586+
var stateMessages []string
587+
if len(noState) > 0 {
588+
stateMessages = append(stateMessages, fmt.Sprintf("%d Created", len(noState)))
549589
}
550590

551-
err := r.updateWorkloadAdmissionCheckStatus(ctx, wl, ac)
552-
if err == nil && originalState != ac.State {
553-
message := fmt.Sprintf("Admission check %q updated state from %q to %q", ac.Name, originalState, ac.State)
554-
r.record.Event(wl, corev1.EventTypeNormal, AdmissionCheckUpdatedEventType, message)
591+
for _, state := range core.SliceStates {
592+
if count := len(slicesByState[state]); count > 0 {
593+
stateMessages = append(stateMessages, fmt.Sprintf("%d %s", count, state))
594+
}
555595
}
556596

557-
return err
597+
ac.Message = fmt.Sprintf("Slices are in states: %s", strings.Join(stateMessages, ", "))
598+
599+
if len(slicesByState[v1alpha1.Error]) > 0 {
600+
var errMessages []string
601+
for _, slice := range slicesByState[v1alpha1.Error] {
602+
cond := meta.FindStatusCondition(slice.Status.Conditions, string(v1alpha1.Error))
603+
errMessages = append(errMessages, cond.Message)
604+
}
605+
ac.Message += ". Errors: " + strings.Join(errMessages, "; ")
606+
}
558607
}
559608

560609
// SetupWithManager sets up the controller with the Manager.

0 commit comments

Comments
 (0)