Skip to content

Commit e32405e

Browse files
authored
[RayService] Migrate from Endpoints API to EndpointSlice API for RayService (#4245)
* Migrate from Endpoints API to EndpointSlice API for RayService Signed-off-by: seanlaii <qazwsx0939059006@gmail.com> * trigger test * add back endpoints rule for backward compatibility * add comment * fix comment * de-duplicate endpoint based on pod uid * address comment * change TODO message * trigger test * remove endpoint RBAC * move comment * change logging level --------- Signed-off-by: seanlaii <qazwsx0939059006@gmail.com>
1 parent aa8d830 commit e32405e

File tree

9 files changed

+159
-64
lines changed

9 files changed

+159
-64
lines changed

.buildkite/test-sample-yamls.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
- kubectl config set clusters.kind-kind.server https://docker:6443
2929
# Deploy KubeRay operator
3030
- pushd ray-operator
31-
- IMG=quay.io/kuberay/operator:v1.5.1 make deploy
31+
- helm install kuberay-operator kuberay/kuberay-operator --version 1.5.1
3232
- kubectl wait --timeout=90s --for=condition=Available=true deployment kuberay-operator
3333
# Run sample YAML tests
3434
- echo "--- START:Running Sample YAMLs (latest release) tests"

helm-chart/kuberay-operator/templates/_helpers.tpl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,6 @@ Create a template to ensure consistency for Role and ClusterRole.
119119
*/}}
120120
{{- define "role.consistentRules" -}}
121121
rules:
122-
- apiGroups:
123-
- ""
124-
resources:
125-
- endpoints
126-
verbs:
127-
- get
128-
- list
129-
- watch
130122
- apiGroups:
131123
- ""
132124
resources:
@@ -218,6 +210,14 @@ rules:
218210
- get
219211
- list
220212
- update
213+
- apiGroups:
214+
- discovery.k8s.io
215+
resources:
216+
- endpointslices
217+
verbs:
218+
- get
219+
- list
220+
- watch
221221
- apiGroups:
222222
- extensions
223223
- networking.k8s.io

ray-operator/config/rbac/role.yaml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ kind: ClusterRole
44
metadata:
55
name: kuberay-operator
66
rules:
7-
- apiGroups:
8-
- ""
9-
resources:
10-
- endpoints
11-
verbs:
12-
- get
13-
- list
14-
- watch
157
- apiGroups:
168
- ""
179
resources:
@@ -103,6 +95,14 @@ rules:
10395
- get
10496
- list
10597
- update
98+
- apiGroups:
99+
- discovery.k8s.io
100+
resources:
101+
- endpointslices
102+
verbs:
103+
- get
104+
- list
105+
- watch
106106
- apiGroups:
107107
- extensions
108108
- networking.k8s.io

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ import (
1414
"github.com/go-logr/logr"
1515
cmap "github.com/orcaman/concurrent-map/v2"
1616
corev1 "k8s.io/api/core/v1"
17+
discoveryv1 "k8s.io/api/discovery/v1"
1718
"k8s.io/apimachinery/pkg/api/errors"
1819
"k8s.io/apimachinery/pkg/api/meta"
1920
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2021
"k8s.io/apimachinery/pkg/runtime"
22+
"k8s.io/apimachinery/pkg/types"
2123
"k8s.io/apimachinery/pkg/util/json"
2224
"k8s.io/apimachinery/pkg/util/yaml"
2325
"k8s.io/client-go/tools/record"
@@ -87,7 +89,6 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut
8789
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
8890
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;list;watch;create;update;patch;delete
8991
// +kubebuilder:rbac:groups=core,resources=pods/proxy,verbs=get;update;patch
90-
// +kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch
9192
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
9293
// +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch
9394
// +kubebuilder:rbac:groups=core,resources=services/proxy,verbs=get;update;patch
@@ -97,6 +98,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut
9798
// +kubebuilder:rbac:groups="gateway.networking.k8s.io",resources=httproutes,verbs=get;list;watch;create;update;
9899
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles,verbs=get;list;watch;create;delete;update
99100
// +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=rolebindings,verbs=get;list;watch;create;delete
101+
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch
100102

101103
// [WARNING]: There MUST be a newline after kubebuilder markers.
102104
// Reconcile is part of the main kubernetes reconciliation loop which aims to
@@ -443,33 +445,26 @@ func (r *RayServiceReconciler) calculateStatus(
443445
}
444446
}
445447

446-
serveEndPoints := &corev1.Endpoints{}
447-
serveServiceName := common.RayServiceServeServiceNamespacedName(rayServiceInstance)
448+
// Calculate the number of ready serve endpoints for the active cluster.
449+
serveServiceNamespacedName := common.RayServiceServeServiceNamespacedName(rayServiceInstance)
448450
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) && activeCluster != nil {
449451
// The Serve service name is based on the unique RayCluster name, since we use the
450452
// per-cluster Serve services for traffic routing during an incremental upgrade.
451-
serveServiceName.Name = utils.GenerateServeServiceName(activeCluster.Name)
453+
serveServiceNamespacedName.Name = utils.GenerateServeServiceName(activeCluster.Name)
452454
}
453-
if err := r.Get(ctx, serveServiceName, serveEndPoints); err != nil && !errors.IsNotFound(err) {
455+
numServeEndpoints, err := r.calculateNumServeEndpointsFromSlices(ctx, serveServiceNamespacedName)
456+
if err != nil {
454457
return err
455458
}
456459

457-
numServeEndpoints := 0
458-
// Ray Pod addresses are categorized into subsets based on the IPs they share.
459-
// subset.Addresses contains a list of Ray Pod addresses with ready serve port.
460-
for _, subset := range serveEndPoints.Subsets {
461-
numServeEndpoints += len(subset.Addresses)
462-
}
463-
464460
// During NewClusterWithIncrementalUpgrade, the pending RayCluster is also serving.
465461
if utils.IsIncrementalUpgradeEnabled(&rayServiceInstance.Spec) && pendingCluster != nil {
466-
pendingServeServiceName := common.RayClusterServeServiceNamespacedName(pendingCluster)
467-
if err := r.Get(ctx, pendingServeServiceName, serveEndPoints); err != nil && !errors.IsNotFound(err) {
462+
pendingServeServiceNamespacedName := common.RayClusterServeServiceNamespacedName(pendingCluster)
463+
pendingEndpoints, err := r.calculateNumServeEndpointsFromSlices(ctx, pendingServeServiceNamespacedName)
464+
if err != nil {
468465
return err
469466
}
470-
for _, subset := range serveEndPoints.Subsets {
471-
numServeEndpoints += len(subset.Addresses)
472-
}
467+
numServeEndpoints += pendingEndpoints
473468
}
474469

475470
if numServeEndpoints > math.MaxInt32 {
@@ -1751,6 +1746,57 @@ func generateHashWithoutReplicasAndWorkersToDelete(rayClusterSpec rayv1.RayClust
17511746
return utils.GenerateJsonHash(updatedRayClusterSpec)
17521747
}
17531748

1749+
// calculateNumServeEndpointsFromSlices calculates the number of unique ready Pods
1750+
// from EndpointSlices associated with a given service.
1751+
//
1752+
// In dual-stack environments (IPv4 + IPv6), the same Pod may appear in multiple
1753+
// EndpointSlices with different IP addresses. This function deduplicates by TargetRef.UID
1754+
// to ensure each Pod is counted only once, regardless of how many IP families it serves.
1755+
//
1756+
// An endpoint is considered ready if it has the `Ready` condition set to true.
1757+
// This replaces the legacy Endpoints API approach.
1758+
func (r *RayServiceReconciler) calculateNumServeEndpointsFromSlices(ctx context.Context, serviceNamespacedName client.ObjectKey) (int, error) {
1759+
logger := ctrl.LoggerFrom(ctx)
1760+
1761+
// List all EndpointSlices for the service.
1762+
// EndpointSlices are automatically created by Kubernetes and labeled with
1763+
// kubernetes.io/service-name to associate them with their parent Service.
1764+
endpointSliceList := &discoveryv1.EndpointSliceList{}
1765+
listOpts := []client.ListOption{
1766+
client.InNamespace(serviceNamespacedName.Namespace),
1767+
client.MatchingLabels{discoveryv1.LabelServiceName: serviceNamespacedName.Name},
1768+
}
1769+
1770+
if err := r.List(ctx, endpointSliceList, listOpts...); err != nil {
1771+
logger.Error(err, "Failed to list EndpointSlices", "serviceName", serviceNamespacedName.Name, "serviceNamespace", serviceNamespacedName.Namespace)
1772+
return 0, err
1773+
}
1774+
1775+
uniqueNumReadyPods := make(map[types.UID]struct{})
1776+
1777+
for _, endpointSlice := range endpointSliceList.Items {
1778+
for _, endpoint := range endpointSlice.Endpoints {
1779+
// Only count endpoints that are ready to serve traffic
1780+
if ptr.Deref(endpoint.Conditions.Ready, false) {
1781+
if endpoint.TargetRef != nil && endpoint.TargetRef.UID != "" {
1782+
uniqueNumReadyPods[endpoint.TargetRef.UID] = struct{}{}
1783+
}
1784+
}
1785+
}
1786+
}
1787+
1788+
numPods := len(uniqueNumReadyPods)
1789+
1790+
logger.V(1).Info("Counted serve-ready pods via EndpointSlices",
1791+
"serviceName", serviceNamespacedName.Name,
1792+
"serviceNamespace", serviceNamespacedName.Namespace,
1793+
"numSlices", len(endpointSliceList.Items),
1794+
"numReadyPods", numPods,
1795+
)
1796+
1797+
return numPods, nil
1798+
}
1799+
17541800
// isHeadPodRunningAndReady checks if the head pod of the RayCluster is running and ready.
17551801
func (r *RayServiceReconciler) isHeadPodRunningAndReady(ctx context.Context, instance *rayv1.RayCluster) (bool, error) {
17561802
headPod, err := common.GetRayClusterHeadPod(ctx, r, instance)

ray-operator/controllers/ray/rayservice_controller_test.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
. "github.com/onsi/ginkgo/v2"
2727
. "github.com/onsi/gomega"
2828
corev1 "k8s.io/api/core/v1"
29+
discoveryv1 "k8s.io/api/discovery/v1"
2930
"k8s.io/apimachinery/pkg/api/meta"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
"k8s.io/client-go/util/retry"
@@ -119,18 +120,29 @@ func rayServiceTemplate(name string, namespace string, serveAppName string) *ray
119120
}
120121
}
121122

122-
func endpointsTemplate(name string, namespace string) *corev1.Endpoints {
123-
return &corev1.Endpoints{
123+
func endpointSliceTemplate(name string, namespace string) *discoveryv1.EndpointSlice {
124+
return &discoveryv1.EndpointSlice{
124125
ObjectMeta: metav1.ObjectMeta{
125126
Name: name,
126127
Namespace: namespace,
128+
Labels: map[string]string{
129+
discoveryv1.LabelServiceName: name,
130+
},
127131
},
128-
Subsets: []corev1.EndpointSubset{
132+
AddressType: discoveryv1.AddressTypeIPv4,
133+
Endpoints: []discoveryv1.Endpoint{
129134
{
130-
Addresses: []corev1.EndpointAddress{
131-
{
132-
IP: "10.9.8.7",
133-
},
135+
Addresses: []string{"10.9.8.7"},
136+
Conditions: discoveryv1.EndpointConditions{
137+
Ready: ptr.To(true),
138+
},
139+
// TargetRef should always be set in production to identify the backing Pod.
140+
// This allows proper deduplication when a Pod appears in multiple EndpointSlices.
141+
TargetRef: &corev1.ObjectReference{
142+
Kind: "Pod",
143+
Namespace: namespace,
144+
Name: "test-pod-1",
145+
UID: "test-uid-12345",
134146
},
135147
},
136148
},
@@ -275,7 +287,7 @@ var _ = Context("RayService env tests", func() {
275287
ctx := context.Background()
276288
var rayService *rayv1.RayService
277289
var rayCluster *rayv1.RayCluster
278-
var endpoints *corev1.Endpoints
290+
var endpointSlice *discoveryv1.EndpointSlice
279291
serveAppName := "app1"
280292
namespace := "default"
281293

@@ -363,9 +375,9 @@ var _ = Context("RayService env tests", func() {
363375
// TODO: Verify the serve service by checking labels and annotations.
364376

365377
By("The RayServiceReady condition should be true when the number of endpoints is greater than 0")
366-
endpoints = endpointsTemplate(utils.GenerateServeServiceName(rayService.Name), namespace)
367-
err = k8sClient.Create(ctx, endpoints)
368-
Expect(err).NotTo(HaveOccurred(), "failed to create Endpoints resource")
378+
endpointSlice = endpointSliceTemplate(utils.GenerateServeServiceName(rayService.Name), namespace)
379+
err = k8sClient.Create(ctx, endpointSlice)
380+
Expect(err).NotTo(HaveOccurred(), "failed to create EndpointSlice resource")
369381
Eventually(func() int32 {
370382
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: namespace}, rayService); err != nil {
371383
return 0
@@ -379,9 +391,9 @@ var _ = Context("RayService env tests", func() {
379391
By(fmt.Sprintf("Delete the RayService custom resource %v", rayService.Name))
380392
err := k8sClient.Delete(ctx, rayService)
381393
Expect(err).NotTo(HaveOccurred(), "failed to delete the test RayService resource")
382-
By(fmt.Sprintf("Delete the Endpoints %v", endpoints.Name))
383-
err = k8sClient.Delete(ctx, endpoints)
384-
Expect(err).NotTo(HaveOccurred(), "failed to delete the test Endpoints resource")
394+
By(fmt.Sprintf("Delete the EndpointSlice %v", endpointSlice.Name))
395+
err = k8sClient.Delete(ctx, endpointSlice)
396+
Expect(err).NotTo(HaveOccurred(), "failed to delete the test EndpointSlice resource")
385397
})
386398

387399
When("Testing in-place update: updating the serveConfigV2", Ordered, func() {

ray-operator/test/e2erayservice/rayservice_redeploy_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,10 @@ func TestRedeployRayServe(t *testing.T) {
5959
LogWithTimestamp(test.T(), "Checking that the K8s serve service eventually has 1 endpoint and the endpoint is not the old head Pod")
6060
g.Eventually(func(g Gomega) {
6161
svcName := utils.GenerateServeServiceName(rayService.Name)
62-
endpoints, err := test.Client().Core().CoreV1().Endpoints(namespace.Name).Get(test.Ctx(), svcName, metav1.GetOptions{})
62+
readyEndpoints, err := GetReadyEndpointsFromSlices(test.Ctx(), test.Client(), namespace.Name, svcName)
6363
g.Expect(err).NotTo(HaveOccurred())
64-
g.Expect(endpoints.Subsets).To(HaveLen(1))
65-
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(1))
66-
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.UID).NotTo(Equal(oldHeadPod.UID))
64+
g.Expect(readyEndpoints).To(HaveLen(1))
65+
g.Expect(readyEndpoints[0].TargetRefUID).NotTo(Equal(oldHeadPod.UID))
6766
}, TestTimeoutMedium).Should(Succeed())
6867

6968
LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be ready", rayService.Namespace, rayService.Name)

ray-operator/test/e2erayservice/rayservice_upgrade_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ func TestOldHeadPodFailDuringUpgrade(t *testing.T) {
5454

5555
svcName := utils.GenerateServeServiceName(rayService.Name)
5656
LogWithTimestamp(test.T(), "Checking that the K8s serve service %s has exactly one endpoint because the cluster only has a head Pod", svcName)
57-
endpoints, err := test.Client().Core().CoreV1().Endpoints(namespace.Name).Get(test.Ctx(), svcName, metav1.GetOptions{})
57+
readyEndpoints, err := GetReadyEndpointsFromSlices(test.Ctx(), test.Client(), namespace.Name, svcName)
5858
g.Expect(err).NotTo(HaveOccurred())
59-
g.Expect(endpoints.Subsets).To(HaveLen(1))
60-
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(1))
61-
headPodName := endpoints.Subsets[0].Addresses[0].TargetRef.Name
59+
g.Expect(readyEndpoints).To(HaveLen(1))
60+
headPodName := readyEndpoints[0].TargetRefName
61+
headPodUID := readyEndpoints[0].TargetRefUID
6262

6363
LogWithTimestamp(test.T(), "Upgrading the RayService to trigger a zero downtime upgrade")
6464
rayService, err = GetRayService(test, namespace.Name, rayService.Name)
@@ -118,11 +118,11 @@ func TestOldHeadPodFailDuringUpgrade(t *testing.T) {
118118

119119
LogWithTimestamp(test.T(), "Checking that the K8s serve service eventually has 1 endpoint and the endpoint is not the old head Pod")
120120
g.Eventually(func(g Gomega) {
121-
endpoints, err = test.Client().Core().CoreV1().Endpoints(namespace.Name).Get(test.Ctx(), svcName, metav1.GetOptions{})
121+
readyEndpoints, err := GetReadyEndpointsFromSlices(test.Ctx(), test.Client(), namespace.Name, svcName)
122122
g.Expect(err).NotTo(HaveOccurred())
123-
g.Expect(endpoints.Subsets).To(HaveLen(1))
124-
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(1))
125-
g.Expect(endpoints.Subsets[0].Addresses[0].TargetRef.Name).NotTo(Equal(headPodName))
123+
g.Expect(readyEndpoints).To(HaveLen(1))
124+
g.Expect(readyEndpoints[0].TargetRefName).NotTo(Equal(headPodName))
125+
g.Expect(readyEndpoints[0].TargetRefUID).NotTo(Equal(headPodUID))
126126
}, TestTimeoutMedium).Should(Succeed())
127127

128128
LogWithTimestamp(test.T(), "Waiting for RayService %s/%s UpgradeInProgress condition to be false", rayService.Namespace, rayService.Name)

ray-operator/test/e2eupgrade/rayservice_operator_upgrade_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,10 @@ func TestZeroDowntimeUpgradeAfterOperatorUpgrade(t *testing.T) {
6464

6565
// Validate RayService serve service correctly configured
6666
svcName := utils.GenerateServeServiceName(rayService.Name)
67-
test.T().Logf("Checking that the K8s serve service %s has exactly one endpoint and two addresses", svcName)
68-
endpoints, err := test.Client().Core().CoreV1().Endpoints(namespace.Name).Get(test.Ctx(), svcName, metav1.GetOptions{})
67+
test.T().Logf("Checking that the K8s serve service %s has two ready endpoints", svcName)
68+
readyEndpoints, err := GetReadyEndpointsFromSlices(test.Ctx(), test.Client(), namespace.Name, svcName)
6969
g.Expect(err).NotTo(HaveOccurred())
70-
g.Expect(endpoints.Subsets).To(HaveLen(1))
71-
g.Expect(endpoints.Subsets[0].Addresses).To(HaveLen(2))
70+
g.Expect(readyEndpoints).To(HaveLen(2))
7271

7372
// Upgrade KubeRay operator to latest version and replace CRDs
7473
test.T().Logf("Upgrading the KubeRay operator to %s", upgradeVersion)

ray-operator/test/support/utils.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package support
22

33
import (
4+
"context"
45
"io/fs"
56
"os"
67
"path"
78

89
"github.com/stretchr/testify/require"
10+
discoveryv1 "k8s.io/api/discovery/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/types"
13+
"k8s.io/utils/ptr"
914
)
1015

1116
func Ptr[T any](v T) *T {
@@ -23,3 +28,37 @@ func WriteToOutputDir(t Test, fileName string, fileType OutputType, data []byte)
2328
err := os.WriteFile(path.Join(t.OutputDir(), fileName+"."+string(fileType)), data, fs.ModePerm)
2429
require.NoError(t.T(), err)
2530
}
31+
32+
// EndpointInfo contains information about a ready endpoint from an EndpointSlice.
33+
type EndpointInfo struct {
34+
TargetRefName string
35+
TargetRefUID types.UID
36+
}
37+
38+
// GetReadyEndpointsFromSlices retrieves all ready endpoints from EndpointSlices
39+
// associated with the given service name in the specified namespace.
40+
// It returns a slice of EndpointInfo containing target reference names and UIDs of ready endpoints.
41+
func GetReadyEndpointsFromSlices(ctx context.Context, client Client, namespace, serviceName string) ([]EndpointInfo, error) {
42+
endpointSliceList, err := client.Core().DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{
43+
LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName,
44+
})
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
var readyEndpoints []EndpointInfo
50+
for _, slice := range endpointSliceList.Items {
51+
for _, endpoint := range slice.Endpoints {
52+
if ptr.Deref(endpoint.Conditions.Ready, false) {
53+
if endpoint.TargetRef != nil && endpoint.TargetRef.UID != "" {
54+
readyEndpoints = append(readyEndpoints, EndpointInfo{
55+
TargetRefName: endpoint.TargetRef.Name,
56+
TargetRefUID: endpoint.TargetRef.UID,
57+
})
58+
}
59+
}
60+
}
61+
}
62+
63+
return readyEndpoints, nil
64+
}

0 commit comments

Comments
 (0)