Skip to content

Commit af80e19

Browse files
authored
[Scheduler] Replace AddMetadataToPod with AddMetadataToChildResource across all schedulers (#4123)
* [Scheduler] Replace AddMetadataToPod with AddMetadataToChildResource across all schedulers Signed-off-by: win5923 <ken89@kimo.com> * Extract AddSchedulerNameToObject to batchscheduler utils package Signed-off-by: win5923 <ken89@kimo.com> * Add test for rayv1.RayCluster to AddSchedulerNameToObject Signed-off-by: win5923 <ken89@kimo.com> --------- Signed-off-by: win5923 <ken89@kimo.com>
1 parent 5990b05 commit af80e19

File tree

10 files changed

+152
-102
lines changed

10 files changed

+152
-102
lines changed

ray-operator/controllers/ray/batchscheduler/interface/interface.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ package schedulerinterface
33
import (
44
"context"
55

6-
corev1 "k8s.io/api/core/v1"
76
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
87
"k8s.io/apimachinery/pkg/runtime"
98
"k8s.io/client-go/rest"
109
"sigs.k8s.io/controller-runtime/pkg/builder"
1110
"sigs.k8s.io/controller-runtime/pkg/client"
12-
13-
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1411
)
1512

1613
// BatchScheduler manages submitting RayCluster pods to a third-party scheduler.
@@ -23,11 +20,6 @@ type BatchScheduler interface {
2320
// For most batch schedulers, this results in the creation of a PodGroup.
2421
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error
2522

26-
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
27-
// For example, setting labels for queues / priority, and setting schedulerName.
28-
// This function will be removed once Rayjob Volcano scheduler integration is completed.
29-
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
30-
3123
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
3224
// For example, setting labels for queues / priority, and setting schedulerName.
3325
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
@@ -63,9 +55,6 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
6355
return nil
6456
}
6557

66-
func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
67-
}
68-
6958
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
7059
}
7160

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ package kaischeduler
99

1010
import (
1111
"context"
12+
"fmt"
1213

13-
corev1 "k8s.io/api/core/v1"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1515
"k8s.io/apimachinery/pkg/runtime"
1616
"k8s.io/client-go/rest"
@@ -20,6 +20,7 @@ import (
2020

2121
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2222
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
23+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/utils"
2324
)
2425

2526
const (
@@ -34,27 +35,32 @@ func GetPluginName() string { return "kai-scheduler" }
3435

3536
func (k *KaiScheduler) Name() string { return GetPluginName() }
3637

37-
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1.Object) error {
38+
func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, object metav1.Object) error {
39+
_, ok := object.(*rayv1.RayCluster)
40+
if !ok {
41+
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
42+
}
3843
return nil
3944
}
4045

41-
func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
46+
func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) {
4247
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler")
43-
pod.Spec.SchedulerName = k.Name()
48+
utils.AddSchedulerNameToObject(child, k.Name())
4449

45-
queue, ok := app.Labels[QueueLabelName]
50+
parentLabel := parent.GetLabels()
51+
queue, ok := parentLabel[QueueLabelName]
4652
if !ok || queue == "" {
47-
logger.Info("Queue label missing from RayCluster; pods will remain pending",
53+
logger.Info("Queue label missing from parent; child will remain pending",
4854
"requiredLabel", QueueLabelName)
4955
return
5056
}
51-
if pod.Labels == nil {
52-
pod.Labels = make(map[string]string)
53-
}
54-
pod.Labels[QueueLabelName] = queue
55-
}
5657

57-
func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
58+
childLabels := child.GetLabels()
59+
if childLabels == nil {
60+
childLabels = make(map[string]string)
61+
}
62+
childLabels[QueueLabelName] = queue
63+
child.SetLabels(childLabels)
5864
}
5965

6066
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod {
4141
}
4242
}
4343

44-
func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
44+
func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
4545
a := assert.New(t)
4646
scheduler := &KaiScheduler{}
4747
ctx := context.Background()
@@ -52,8 +52,8 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
5252
})
5353
pod := createTestPod()
5454

55-
// Call AddMetadataToPod
56-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
55+
// Call AddMetadataToChildResource
56+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
5757

5858
// Assert scheduler name is set to kai-scheduler
5959
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -63,7 +63,7 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
6363
a.Equal("test-queue", pod.Labels[QueueLabelName])
6464
}
6565

66-
func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
66+
func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
6767
a := assert.New(t)
6868
scheduler := &KaiScheduler{}
6969
ctx := context.Background()
@@ -72,8 +72,8 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
7272
rayCluster := createTestRayCluster(map[string]string{})
7373
pod := createTestPod()
7474

75-
// Call AddMetadataToPod
76-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
75+
// Call AddMetadataToChildResource
76+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
7777

7878
// Assert scheduler name is still set (always required)
7979
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -85,7 +85,7 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
8585
}
8686
}
8787

88-
func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
88+
func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
8989
a := assert.New(t)
9090
scheduler := &KaiScheduler{}
9191
ctx := context.Background()
@@ -96,8 +96,8 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
9696
})
9797
pod := createTestPod()
9898

99-
// Call AddMetadataToPod
100-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
99+
// Call AddMetadataToChildResource
100+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
101101

102102
// Assert scheduler name is still set
103103
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -109,7 +109,7 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
109109
}
110110
}
111111

112-
func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
112+
func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
113113
a := assert.New(t)
114114
scheduler := &KaiScheduler{}
115115
ctx := context.Background()
@@ -126,8 +126,8 @@ func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
126126
"app": "ray",
127127
}
128128

129-
// Call AddMetadataToPod
130-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
129+
// Call AddMetadataToChildResource
130+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
131131

132132
// Assert scheduler name is set
133133
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
corev1 "k8s.io/api/core/v1"
87
"k8s.io/apimachinery/pkg/api/errors"
98
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
109
"k8s.io/apimachinery/pkg/runtime"
@@ -17,6 +16,7 @@ import (
1716

1817
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1918
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
19+
batchschedulerutils "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/utils"
2020
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2121
)
2222

@@ -93,21 +93,23 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec
9393
return nil
9494
}
9595

96-
// AddMetadataToPod adds essential labels and annotations to the Ray pod
96+
// AddMetadataToChildResource adds essential labels and annotations to the child resource.
9797
// the scheduler needs these labels and annotations in order to do the scheduling properly
98-
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
99-
// when gang scheduling is enabled, extra labels need to be added to all pods
100-
if k.isGangSchedulingEnabled(rayCluster) {
101-
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
98+
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) {
99+
// when gang scheduling is enabled, extra labels need to be added to all child resources
100+
if k.isGangSchedulingEnabled(parent) {
101+
labels := child.GetLabels()
102+
if labels == nil {
103+
labels = make(map[string]string)
104+
}
105+
labels[kubeSchedulerPodGroupLabelKey] = parent.GetName()
106+
child.SetLabels(labels)
102107
}
103-
pod.Spec.SchedulerName = k.Name()
104-
}
105-
106-
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
108+
batchschedulerutils.AddSchedulerNameToObject(child, k.Name())
107109
}
108110

109-
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
110-
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
111+
func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
112+
_, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled]
111113
return exist
112114
}
113115

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) {
117117
a.Equal(int32(5), podGroup.Spec.MinMember)
118118
}
119119

120-
func TestAddMetadataToPod(t *testing.T) {
120+
func TestAddMetadataToChildResource(t *testing.T) {
121121
tests := []struct {
122122
name string
123123
enableGang bool
@@ -150,7 +150,7 @@ func TestAddMetadataToPod(t *testing.T) {
150150
}
151151

152152
scheduler := &KubeScheduler{}
153-
scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod)
153+
scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker")
154154

155155
if tt.enableGang {
156156
a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey])
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package utils
2+
3+
import (
4+
corev1 "k8s.io/api/core/v1"
5+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6+
)
7+
8+
// AddSchedulerNameToObject sets the schedulerName field on Pod and PodTemplateSpec resources.
9+
// Used to assign batch scheduler names to:
10+
// - Head pod and worker pod in RayCluster
11+
// - Job in RayJob
12+
func AddSchedulerNameToObject(obj metav1.Object, schedulerName string) {
13+
switch obj := obj.(type) {
14+
case *corev1.Pod:
15+
obj.Spec.SchedulerName = schedulerName
16+
case *corev1.PodTemplateSpec:
17+
obj.Spec.SchedulerName = schedulerName
18+
}
19+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package utils
2+
3+
import (
4+
"testing"
5+
6+
corev1 "k8s.io/api/core/v1"
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
9+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
10+
)
11+
12+
func TestAddSchedulerNameToObject(t *testing.T) {
13+
schedulerName := "test-scheduler"
14+
15+
t.Run("Pod object should have schedulerName set", func(t *testing.T) {
16+
pod := &corev1.Pod{
17+
ObjectMeta: metav1.ObjectMeta{
18+
Name: "test-pod",
19+
Namespace: "default",
20+
},
21+
Spec: corev1.PodSpec{},
22+
}
23+
24+
AddSchedulerNameToObject(pod, schedulerName)
25+
26+
if pod.Spec.SchedulerName != schedulerName {
27+
t.Errorf("expected schedulerName to be %q, got %q", schedulerName, pod.Spec.SchedulerName)
28+
}
29+
})
30+
31+
t.Run("PodTemplateSpec object should have schedulerName set", func(t *testing.T) {
32+
podTemplate := &corev1.PodTemplateSpec{
33+
ObjectMeta: metav1.ObjectMeta{
34+
Name: "test-template",
35+
Namespace: "default",
36+
},
37+
Spec: corev1.PodSpec{},
38+
}
39+
40+
AddSchedulerNameToObject(podTemplate, schedulerName)
41+
42+
if podTemplate.Spec.SchedulerName != schedulerName {
43+
t.Errorf("expected schedulerName to be %q, got %q", schedulerName, podTemplate.Spec.SchedulerName)
44+
}
45+
})
46+
47+
t.Run("RayCluster object should not be modified", func(t *testing.T) {
48+
// When AddMetadataToChildResource is called with a RayCluster,
49+
// only the metadata propagation applies. The schedulerName is set later on actual Pods
50+
// (Head/Worker Pods for RayCluster or submitter Job for RayJob), not on the RayCluster itself.
51+
// This test validates the intentional silent no-op behavior for unsupported types.
52+
rayCluster := &rayv1.RayCluster{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: "test-raycluster",
55+
Namespace: "default",
56+
},
57+
Spec: rayv1.RayClusterSpec{
58+
HeadGroupSpec: rayv1.HeadGroupSpec{
59+
Template: corev1.PodTemplateSpec{
60+
Spec: corev1.PodSpec{
61+
Containers: []corev1.Container{
62+
{Name: "test", Image: "test"},
63+
},
64+
},
65+
},
66+
},
67+
},
68+
}
69+
70+
// Store original state
71+
originalSchedulerName := rayCluster.Spec.HeadGroupSpec.Template.Spec.SchedulerName
72+
73+
// This should not panic and should not modify the RayCluster's PodTemplateSpecs
74+
AddSchedulerNameToObject(rayCluster, schedulerName)
75+
76+
// Verify the RayCluster's PodTemplateSpec was not modified
77+
if rayCluster.Spec.HeadGroupSpec.Template.Spec.SchedulerName != originalSchedulerName {
78+
t.Errorf("RayCluster HeadGroupSpec.Template schedulerName was modified: expected %q, got %q",
79+
originalSchedulerName, rayCluster.Spec.HeadGroupSpec.Template.Spec.SchedulerName)
80+
}
81+
})
82+
}

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,19 +251,6 @@ func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, pa
251251
addSchedulerName(child, v.Name())
252252
}
253253

254-
// This function will be removed in interface migration PR
255-
func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
256-
pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
257-
pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
258-
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
259-
pod.Labels[QueueNameLabelKey] = queue
260-
}
261-
if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
262-
pod.Spec.PriorityClassName = priorityClassName
263-
}
264-
pod.Spec.SchedulerName = v.Name()
265-
}
266-
267254
func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
268255
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
269256
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)

0 commit comments

Comments
 (0)