Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions zrpc/resolver/internal/kube/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -28,20 +28,20 @@ func NewEventHandler(update func([]string)) *EventHandler {

// OnAdd handles the endpoints add events.
func (h *EventHandler) OnAdd(obj any, _ bool) {
endpoints, ok := obj.(*v1.Endpoints)
endpoints, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", obj)
return
}

h.lock.Lock()
defer h.lock.Unlock()

var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; !ok {
h.endpoints[point.IP] = lang.Placeholder
for _, point := range endpoints.Endpoints {
for _, address := range point.Addresses {
if _, ok := h.endpoints[address]; !ok {
h.endpoints[address] = lang.Placeholder
changed = true
}
}
Expand All @@ -54,20 +54,20 @@ func (h *EventHandler) OnAdd(obj any, _ bool) {

// OnDelete handles the endpoints delete events.
func (h *EventHandler) OnDelete(obj any) {
endpoints, ok := obj.(*v1.Endpoints)
endpoints, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", obj)
return
}

h.lock.Lock()
defer h.lock.Unlock()

var changed bool
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
if _, ok := h.endpoints[point.IP]; ok {
delete(h.endpoints, point.IP)
for _, point := range endpoints.Endpoints {
for _, address := range point.Addresses {
if _, ok := h.endpoints[address]; ok {
delete(h.endpoints, address)
changed = true
}
}
Expand All @@ -80,35 +80,35 @@ func (h *EventHandler) OnDelete(obj any) {

// OnUpdate handles the endpoints update events.
func (h *EventHandler) OnUpdate(oldObj, newObj any) {
oldEndpoints, ok := oldObj.(*v1.Endpoints)
oldEndpointSlices, ok := oldObj.(*discoveryv1.EndpointSlice)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", oldObj)
return
}

newEndpoints, ok := newObj.(*v1.Endpoints)
newEndpointSlices, ok := newObj.(*discoveryv1.EndpointSlice)
if !ok {
logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
logx.Errorf("%v is not an object with type *discoveryv1.EndpointSlice", newObj)
return
}

if oldEndpoints.ResourceVersion == newEndpoints.ResourceVersion {
if oldEndpointSlices.ResourceVersion == newEndpointSlices.ResourceVersion {
return
}

h.Update(newEndpoints)
h.Update(newEndpointSlices)
}

// Update updates the endpoints.
func (h *EventHandler) Update(endpoints *v1.Endpoints) {
func (h *EventHandler) Update(endpoints *discoveryv1.EndpointSlice) {
h.lock.Lock()
defer h.lock.Unlock()

old := h.endpoints
h.endpoints = make(map[string]lang.PlaceholderType)
for _, sub := range endpoints.Subsets {
for _, point := range sub.Addresses {
h.endpoints[point.IP] = lang.Placeholder
for _, point := range endpoints.Endpoints {
for _, address := range point.Addresses {
h.endpoints[address] = lang.Placeholder
}
}

Expand Down
Loading