Skip to content

Commit 5f3afb3

Browse files
committed
add ray.io/kuberay-version annotations for head pod and worker pods
Signed-off-by: win5923 <ken89@kimo.com>
1 parent f3a9b81 commit 5f3afb3

File tree

3 files changed

+100
-8
lines changed

3 files changed

+100
-8
lines changed

ray-operator/controllers/ray/common/pod.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,12 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
188188
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
189189
podTemplate.ObjectMeta.Namespace = instance.Namespace
190190

191+
if podTemplate.Annotations == nil {
192+
podTemplate.Annotations = make(map[string]string)
193+
}
194+
// Set the KubeRay version used to create the pod
195+
podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION
191196
if templateHash != "" {
192-
if podTemplate.Annotations == nil {
193-
podTemplate.Annotations = make(map[string]string)
194-
}
195197
podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash
196198
}
197199

@@ -335,15 +337,16 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo
335337

336338
podTemplate := workerSpec.Template
337339
podTemplate.GenerateName = podName
338-
339340
// Pods created by RayCluster should be restricted to the namespace of the RayCluster.
340341
// This ensures privilege of KubeRay users are contained within the namespace of the RayCluster.
341342
podTemplate.ObjectMeta.Namespace = instance.Namespace
342343

344+
if podTemplate.Annotations == nil {
345+
podTemplate.Annotations = make(map[string]string)
346+
}
347+
// Set the KubeRay version used to create the pod
348+
podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION
343349
if templateHash != "" {
344-
if podTemplate.Annotations == nil {
345-
podTemplate.Annotations = make(map[string]string)
346-
}
347350
podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash
348351
}
349352
// The Ray worker should only start once the GCS server is ready.

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,21 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context,
11371137
return false
11381138
}
11391139

1140+
// Case 1: If the KubeRay version has changed, update annotations then check in the next reconciliation
1141+
for _, pod := range allPods.Items {
1142+
podVersion := pod.Annotations[utils.KubeRayVersion]
1143+
if podVersion != "" && podVersion != utils.KUBERAY_VERSION {
1144+
logger.Info("Pods have different KubeRay version, updating pod annotations",
1145+
"pod", pod.Name,
1146+
"podVersion", podVersion,
1147+
"currentVersion", utils.KUBERAY_VERSION)
1148+
if err := r.updatePodsAnnotations(ctx, instance, &allPods); err != nil {
1149+
logger.Error(err, "Failed to update pod annotations for KubeRay version change")
1150+
}
1151+
return false
1152+
}
1153+
}
1154+
11401155
headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template)
11411156
if err != nil {
11421157
logger.Error(err, "Failed to generate head template hash")
@@ -1153,7 +1168,7 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context,
11531168
workerHashMap[workerGroup.GroupName] = hash
11541169
}
11551170

1156-
// Check each pod to see if its template hash matches the current spec
1171+
// Case 2: If the pod template hash has changed, recreate all pods
11571172
for _, pod := range allPods.Items {
11581173
nodeType := pod.Labels[utils.RayNodeTypeLabelKey]
11591174
actualHash := pod.Annotations[utils.PodTemplateHashKey]
@@ -1183,6 +1198,61 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context,
11831198
return false
11841199
}
11851200

1201+
// updatePodsAnnotations updates pod annotations to match the current KubeRay version and PodTemplateHashKey
1202+
func (r *RayClusterReconciler) updatePodsAnnotations(ctx context.Context, instance *rayv1.RayCluster, allPods *corev1.PodList) error {
1203+
logger := ctrl.LoggerFrom(ctx)
1204+
1205+
for i := range allPods.Items {
1206+
pod := &allPods.Items[i]
1207+
podVersion := pod.Annotations[utils.KubeRayVersion]
1208+
1209+
if podVersion == utils.KUBERAY_VERSION || podVersion == "" {
1210+
continue
1211+
}
1212+
1213+
newHash, err := r.calculatePodTemplateHash(instance, pod)
1214+
if err != nil {
1215+
return err
1216+
}
1217+
1218+
if pod.Annotations == nil {
1219+
pod.Annotations = make(map[string]string)
1220+
}
1221+
pod.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION
1222+
pod.Annotations[utils.PodTemplateHashKey] = newHash
1223+
1224+
if err := r.Update(ctx, pod); err != nil {
1225+
return err
1226+
}
1227+
1228+
logger.Info("Updated pod annotations", "pod", pod.Name, "version", utils.KUBERAY_VERSION)
1229+
}
1230+
1231+
return nil
1232+
}
1233+
1234+
// calculatePodTemplateHash calculates the hash for a pod's template based on its node type and group
1235+
func (r *RayClusterReconciler) calculatePodTemplateHash(instance *rayv1.RayCluster, pod *corev1.Pod) (string, error) {
1236+
nodeType := pod.Labels[utils.RayNodeTypeLabelKey]
1237+
1238+
switch rayv1.RayNodeType(nodeType) {
1239+
case rayv1.HeadNode:
1240+
return common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template)
1241+
1242+
case rayv1.WorkerNode:
1243+
groupName := pod.Labels[utils.RayNodeGroupLabelKey]
1244+
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
1245+
if workerGroup.GroupName == groupName {
1246+
return common.GeneratePodTemplateHash(workerGroup.Template)
1247+
}
1248+
}
1249+
return "", fmt.Errorf("worker group %s not found in RayCluster spec", groupName)
1250+
1251+
default:
1252+
return "", fmt.Errorf("unknown node type: %s", nodeType)
1253+
}
1254+
}
1255+
11861256
// shouldDeletePod returns whether the Pod should be deleted and the reason
11871257
//
11881258
// @param pod: The Pod to be checked.

ray-operator/controllers/ray/raycluster_controller_unit_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3668,6 +3668,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) {
36683668
},
36693669
Annotations: map[string]string{
36703670
utils.PodTemplateHashKey: templateHash,
3671+
utils.KubeRayVersion: utils.KUBERAY_VERSION,
36713672
},
36723673
},
36733674
Spec: corev1.PodSpec{
@@ -3679,6 +3680,13 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) {
36793680
}
36803681
}
36813682

3683+
// Helper function to create a pod with specific template hash and KubeRay version
3684+
createPodWithHashAndVersion := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string, kuberayVersion string) *corev1.Pod {
3685+
pod := createPodWithHash(name, nodeType, groupName, templateHash)
3686+
pod.Annotations[utils.KubeRayVersion] = kuberayVersion
3687+
return pod
3688+
}
3689+
36823690
tests := []struct {
36833691
name string
36843692
upgradeStrategy *rayv1.RayClusterUpgradeStrategy
@@ -3746,6 +3754,17 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) {
37463754
},
37473755
expectedRecreate: true,
37483756
},
3757+
{
3758+
name: "Recreate strategy with different KubeRay version - should update annotations and not recreate",
3759+
upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{
3760+
Type: ptr.To(rayv1.RayClusterRecreate),
3761+
},
3762+
pods: []runtime.Object{
3763+
createPodWithHashAndVersion("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash", "v1.0.0"),
3764+
createPodWithHashAndVersion("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash", "v1.0.0"),
3765+
},
3766+
expectedRecreate: false,
3767+
},
37493768
}
37503769

37513770
for _, tc := range tests {

0 commit comments

Comments
 (0)