Skip to content

Commit ec81059

Browse files
committed
feat(zrpc): migrate kube resolver from Endpoints to EndpointSlice API
* Replace core/v1 Endpoints with discovery/v1 EndpointSlice in event handler * Update handler logic to iterate over EndpointSlice.Endpoints[].Addresses[] * Switch informer to watch EndpointSlices using label selector "kubernetes.io/service-name" * Retrieve service port from the first EndpointSlice port when not specified * Rename constant `nameSelector` → `serviceSelector` for clarity * Improve error handling for missing EndpointSlices Signed-off-by: soasurs <soasurs@gmail.com>
1 parent 937cf0d commit ec81059

File tree

2 files changed

+52
-34
lines changed

2 files changed

+52
-34
lines changed

zrpc/resolver/internal/kube/eventhandler.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55

66
"github.com/zeromicro/go-zero/core/lang"
77
"github.com/zeromicro/go-zero/core/logx"
8-
v1 "k8s.io/api/core/v1"
8+
discoveryv1 "k8s.io/api/discovery/v1"
99
"k8s.io/client-go/tools/cache"
1010
)
1111

@@ -28,20 +28,23 @@ func NewEventHandler(update func([]string)) *EventHandler {
2828

2929
// OnAdd handles the endpoints add events.
3030
func (h *EventHandler) OnAdd(obj any, _ bool) {
31-
endpoints, ok := obj.(*v1.Endpoints)
31+
endpoints, ok := obj.(*discoveryv1.EndpointSlice)
3232
if !ok {
33-
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
33+
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", obj)
3434
return
3535
}
3636

3737
h.lock.Lock()
3838
defer h.lock.Unlock()
3939

4040
var changed bool
41-
for _, sub := range endpoints.Subsets {
42-
for _, point := range sub.Addresses {
43-
if _, ok := h.endpoints[point.IP]; !ok {
44-
h.endpoints[point.IP] = lang.Placeholder
41+
for _, point := range endpoints.Endpoints {
42+
if len(point.Addresses) == 0 {
43+
continue
44+
}
45+
for _, address := range point.Addresses {
46+
if _, ok := h.endpoints[address]; !ok {
47+
h.endpoints[address] = lang.Placeholder
4548
changed = true
4649
}
4750
}
@@ -54,20 +57,23 @@ func (h *EventHandler) OnAdd(obj any, _ bool) {
5457

5558
// OnDelete handles the endpoints delete events.
5659
func (h *EventHandler) OnDelete(obj any) {
57-
endpoints, ok := obj.(*v1.Endpoints)
60+
endpoints, ok := obj.(*discoveryv1.EndpointSlice)
5861
if !ok {
59-
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
62+
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", obj)
6063
return
6164
}
6265

6366
h.lock.Lock()
6467
defer h.lock.Unlock()
6568

6669
var changed bool
67-
for _, sub := range endpoints.Subsets {
68-
for _, point := range sub.Addresses {
69-
if _, ok := h.endpoints[point.IP]; ok {
70-
delete(h.endpoints, point.IP)
70+
for _, point := range endpoints.Endpoints {
71+
if len(point.Addresses) == 0 {
72+
continue
73+
}
74+
for _, address := range point.Addresses {
75+
if _, ok := h.endpoints[address]; ok {
76+
delete(h.endpoints, address)
7177
changed = true
7278
}
7379
}
@@ -80,15 +86,15 @@ func (h *EventHandler) OnDelete(obj any) {
8086

8187
// OnUpdate handles the endpoints update events.
8288
func (h *EventHandler) OnUpdate(oldObj, newObj any) {
83-
oldEndpoints, ok := oldObj.(*v1.Endpoints)
89+
oldEndpoints, ok := oldObj.(*discoveryv1.EndpointSlice)
8490
if !ok {
85-
logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
91+
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", oldObj)
8692
return
8793
}
8894

89-
newEndpoints, ok := newObj.(*v1.Endpoints)
95+
newEndpoints, ok := newObj.(*discoveryv1.EndpointSlice)
9096
if !ok {
91-
logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
97+
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", newObj)
9298
return
9399
}
94100

@@ -100,15 +106,18 @@ func (h *EventHandler) OnUpdate(oldObj, newObj any) {
100106
}
101107

102108
// Update updates the endpoints.
103-
func (h *EventHandler) Update(endpoints *v1.Endpoints) {
109+
func (h *EventHandler) Update(endpoints *discoveryv1.EndpointSlice) {
104110
h.lock.Lock()
105111
defer h.lock.Unlock()
106112

107113
old := h.endpoints
108114
h.endpoints = make(map[string]lang.PlaceholderType)
109-
for _, sub := range endpoints.Subsets {
110-
for _, point := range sub.Addresses {
111-
h.endpoints[point.IP] = lang.Placeholder
115+
for _, point := range endpoints.Endpoints {
116+
if len(point.Addresses) == 0 {
117+
continue
118+
}
119+
for _, address := range point.Addresses {
120+
h.endpoints[address] = lang.Placeholder
112121
}
113122
}
114123

zrpc/resolver/internal/kubebuilder.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818
)
1919

2020
const (
21-
resyncInterval = 5 * time.Minute
22-
nameSelector = "metadata.name="
21+
resyncInterval = 5 * time.Minute
22+
serviceSelector = "kubernetes.io/service-name="
2323
)
2424

2525
type kubeResolver struct {
@@ -60,14 +60,19 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
6060
}
6161

6262
if svc.Port == 0 {
63-
// getting endpoints is only to get the port
64-
endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(
65-
context.Background(), svc.Name, v1.GetOptions{})
63+
endpointSlices, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(context.Background(), v1.ListOptions{
64+
LabelSelector: serviceSelector + svc.Name,
65+
})
6666
if err != nil {
6767
return nil, err
6868
}
69+
if len(endpointSlices.Items) == 0 {
70+
return nil, fmt.Errorf("no endpoint slices found for service %s in namespace %s", svc.Name, svc.Namespace)
71+
}
6972

70-
svc.Port = int(endpoints.Subsets[0].Ports[0].Port)
73+
// Since this resolver is used for in-cluster service discovery,
74+
// there should be at least one port available.
75+
svc.Port = int(*endpointSlices.Items[0].Ports[0].Port)
7176
}
7277

7378
handler := kube.NewEventHandler(func(endpoints []string) {
@@ -88,23 +93,27 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn,
8893
inf := informers.NewSharedInformerFactoryWithOptions(cs, resyncInterval,
8994
informers.WithNamespace(svc.Namespace),
9095
informers.WithTweakListOptions(func(options *v1.ListOptions) {
91-
options.FieldSelector = nameSelector + svc.Name
96+
options.LabelSelector = serviceSelector + svc.Name
9297
}))
93-
in := inf.Core().V1().Endpoints()
98+
in := inf.Discovery().V1().EndpointSlices()
9499
_, err = in.Informer().AddEventHandler(handler)
95100
if err != nil {
96101
return nil, err
97102
}
98103

99-
// get the initial endpoints, cannot use the previous endpoints,
100-
// because the endpoints may be updated before/after the informer is started.
101-
endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(
102-
context.Background(), svc.Name, v1.GetOptions{})
104+
// get the initial endpoint slices, cannot use the previous endpoint slices,
105+
// because the endpoint slices may be updated before/after the informer is started.
106+
endpointSlices, err := cs.DiscoveryV1().EndpointSlices(svc.Namespace).List(
107+
context.Background(), v1.ListOptions{
108+
LabelSelector: serviceSelector + svc.Name,
109+
})
103110
if err != nil {
104111
return nil, err
105112
}
106113

107-
handler.Update(endpoints)
114+
for _, endpointSlice := range endpointSlices.Items {
115+
handler.Update(&endpointSlice)
116+
}
108117

109118
r := &kubeResolver{
110119
cc: cc,

0 commit comments

Comments
 (0)