From ed365be4e6d3fcef7ea171e0db5da930272497a5 Mon Sep 17 00:00:00 2001 From: Tom Date: Sun, 29 Jun 2025 17:52:03 +0000 Subject: [PATCH 01/14] Add Workload DnsController Signed-off-by: Tom --- pkg/controller/controller.go | 5 +- pkg/controller/workload/dns.go | 230 ++++++++++++++++++ .../workload/workload_controller.go | 20 +- pkg/controller/workload/workload_processor.go | 67 +++-- 4 files changed, 298 insertions(+), 24 deletions(-) create mode 100644 pkg/controller/workload/dns.go diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9e2b90862..65e89f2d4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -162,9 +162,8 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) if c.client.WorkloadController != nil { - if err := c.client.WorkloadController.Run(ctx); err != nil { - return fmt.Errorf("failed to start workload controller: %+v", err) - } + c.client.WorkloadController.StartDnsController(stopCh) + c.client.WorkloadController.Run(ctx) } else { c.client.AdsController.StartDnsController(stopCh) } diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go new file mode 100644 index 000000000..cce4639ed --- /dev/null +++ b/pkg/controller/workload/dns.go @@ -0,0 +1,230 @@ +package workload + +import ( + "net" + "net/netip" + "sync" + "time" + + "google.golang.org/protobuf/proto" + "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller/workload/cache" + "kmesh.net/kmesh/pkg/dns" +) + +type dnsController struct { + workloadsChan chan []*workloadapi.Workload + cache cache.WorkloadCache + dnsResolver *dns.DNSResolver + // store the copy of pendingResolveWorkload. + workloadCache map[string]*pendingResolveDomain + // store all pending hostnames in the workloads + pendingHostnames map[string][]string + sync.RWMutex +} + +// pending resolve domain info of Dual-Engine Mode, +// workload is used for create the apiworkload +type pendingResolveDomain struct { + Workloads []*workloadapi.Workload + RefreshRate time.Duration +} + +func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { + resolver, err := dns.NewDNSResolver() + if err != nil { + return nil, err + } + return &dnsController{ + workloadsChan: make(chan []*workloadapi.Workload), + cache: cache, + dnsResolver: resolver, + workloadCache: make(map[string]*pendingResolveDomain), + pendingHostnames: make(map[string][]string), + }, nil +} + +func (r *dnsController) Run(stopCh <-chan struct{}) { + go r.dnsResolver.StartDnsResolver(stopCh) + go r.refreshWorker(stopCh) + go r.processWorkloads() + go func() { + <-stopCh + close(r.workloadsChan) + }() +} + +func (r *dnsController) processWorkloads() { + for workloads := range r.workloadsChan { + r.processDomains(workloads) + } +} + +func (r *dnsController) processDomains(workload []*workloadapi.Workload) { + domains := getPendingResolveDomain(workload) + + // store all pending hostnames of clusters in pendingHostnames + for _, workload := range workload { + workloadName := workload.GetName() + hostname := workload.GetHostname() + r.pendingHostnames[workloadName] = []string{hostname} + r.workloadCache[hostname] = &pendingResolveDomain{ + Workloads: []*workloadapi.Workload{workload}, + RefreshRate: 15 * time.Second, + } + } + + // delete any scheduled re-resolve for domains we no longer care about + r.dnsResolver.RemoveUnwatchDomain(domains) + + // update workloadCache with pendingResolveWorkload + for k, v := range domains { + addresses := r.dnsResolver.GetDNSAddresses(k) + if addresses != nil { + go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses) + } else { + // Initialize the newly added hostname + // and add it to the dns queue to be resolved. + domainInfo := &dns.DomainInfo{ + Domain: k, + RefreshRate: v.(*pendingResolveDomain).RefreshRate, + } + r.dnsResolver.AddDomainInQueue(domainInfo, 0) + } + } +} + +func (r *dnsController) refreshWorker(stop <-chan struct{}) { + for { + select { + case <-stop: + return + case domain := <-r.dnsResolver.DnsChan: + pendingDomain := r.getWorkloadsByDomain(domain) + // log.Infof("dnsController refreshWorker: domain %s, pendingDomain %v", domain, pendingDomain) + addrs := r.dnsResolver.GetDNSAddresses(domain) + maxRetry := 3 + for range maxRetry { + if len(addrs) > 0 { + r.updateWorkloads(pendingDomain, domain, addrs) + } + time.Sleep(1 * time.Second) + } + } + } +} + +func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { + isWorkerUpdate := false + if pendingDomain == nil || addrs == nil{ + return + } + log.Infof("dnsController updateWorkloads: pendingDomain %v, domain %s, addrs %v", pendingDomain, domain, addrs) + + for _, workload := range pendingDomain.Workloads { + if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { + // log.Infof("dnsController update cache for workload %s with addresses %v", workload.ResourceName(), addrs) + if r.cache.GetWorkloadByUid(workload.GetUid()) != nil { + r.cache.AddOrUpdateWorkload(newWorkload) + delete(r.workloadCache, domain) + isWorkerUpdate = true + } + } + } + + if isWorkerUpdate { + // w.cache.Flush() + } +} + +func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { + ready := true + hostNames := r.pendingHostnames[workload.GetName()] + addressesOfHostname := make(map[string][]string) + + for _, hostName := range hostNames { + // log.Infof("overwriteDnsWorkload: checking hostname %s for workload %s with domain %s", hostName, workload.ResourceName(), domain) + addresses := r.dnsResolver.GetDNSAddresses(hostName) + // There are hostnames in this Cluster that are not resolved. + if addresses != nil { + addressesOfHostname[hostName] = addresses + } else { + ready = false + } + } + + if ready { + // log.Infof("overwriteDnsWorkload ready for workload %s with domain %s", workload.ResourceName(), domain) + newWorkload := cloneWorkload(workload) + for _, addr := range addrs { + if ip := net.ParseIP(addr); ip != nil && ip.To4() != nil { + newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) + } + } + + return ready, newWorkload + } + + return ready, nil +} + +func getPendingResolveDomain(workloads []*workloadapi.Workload) map[string]any { + domains := make(map[string]any) + + for _, workload := range workloads { + hostname := workload.GetHostname() + if hostname == "" { + continue + } + + if _, err := netip.ParseAddr(hostname); err == nil { + // This is an ip address + continue + } + + // log.Infof("getPendingResolveDomain: processing workload %s with hostname %s", workload.ResourceName(), hostname) + if v, ok := domains[hostname]; ok { + v.(*pendingResolveDomain).Workloads = append(v.(*pendingResolveDomain).Workloads, workload) + } else { + + domainWithRefreshRate := &pendingResolveDomain{ + Workloads: []*workloadapi.Workload{workload}, + RefreshRate: 15 * time.Second, + } + domains[hostname] = domainWithRefreshRate + } + } + + return domains +} + +func (r *dnsController) newWorkloadCache() { + r.Lock() + defer r.Unlock() + + if r.workloadCache != nil { + log.Debug("clean up dns workloads") + r.workloadCache = map[string]*pendingResolveDomain{} + return + } +} + +func (r *dnsController) getWorkloadsByDomain(domain string) *pendingResolveDomain { + r.RLock() + defer r.RUnlock() + + if r.workloadCache != nil { + if v, ok := r.workloadCache[domain]; ok { + return v + } + } + return nil +} + +func cloneWorkload(workload *workloadapi.Workload) *workloadapi.Workload { + if workload == nil { + return nil + } + workloadCopy := proto.Clone(workload).(*workloadapi.Workload) + return workloadCopy +} diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 3facbe89b..0ce36fdd8 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -40,6 +40,7 @@ var log = logger.NewLoggerScope("workload_controller") type Controller struct { Stream discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient Processor *Processor + dnsResolverController *dnsController Rbac *auth.Rbac MetricController *telemetry.MetricController MapMetricController *telemetry.MapMetricController @@ -48,9 +49,17 @@ type Controller struct { } func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) *Controller { + processor := NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps) + dnsResolverController, err := NewDnsController(processor.WorkloadCache) + if err != nil { + log.Errorf("dns resolver of Dual-Engine mode create failed: %v", err) + return nil + } + processor.DnsResolverChan = dnsResolverController.workloadsChan c := &Controller{ - Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps), - bpfWorkloadObj: bpfWorkload, + dnsResolverController: dnsResolverController, + Processor: processor, + bpfWorkloadObj: bpfWorkload, } // do some initialization when restart // restore endpoint index, otherwise endpoint number can double @@ -151,6 +160,7 @@ func (c *Controller) HandleWorkloadStream() error { return fmt.Errorf("stream recv failed, %s", err) } + // c.dnsResolverController.newWorkloadCache() c.Processor.processWorkloadResponse(rspDelta, c.Rbac) if err = c.Stream.Send(c.Processor.ack); err != nil { @@ -197,3 +207,9 @@ func (c *Controller) SetConnectionMetricTrigger(enable bool) { func (c *Controller) GetConnectionMetricTrigger() bool { return c.MetricController.EnableConnectionMetric.Load() } + +func (c *Controller) StartDnsController(stopCh <-chan struct{}) { + if c.dnsResolverController != nil { + c.dnsResolverController.Run(stopCh) + } +} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index e181f7d65..a04d07492 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -23,6 +23,7 @@ import ( "sort" "strings" "sync" + "time" service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/proto" @@ -31,7 +32,6 @@ import ( "istio.io/istio/pkg/util/sets" "kmesh.net/kmesh/api/v2/workloadapi" - "kmesh.net/kmesh/api/v2/workloadapi/security" security_v2 "kmesh.net/kmesh/api/v2/workloadapi/security" bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/dualengine" "kmesh.net/kmesh/pkg/auth" @@ -40,7 +40,6 @@ import ( "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/config" "kmesh.net/kmesh/pkg/controller/telemetry" - "kmesh.net/kmesh/pkg/controller/workload/bpfcache" bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/nets" @@ -64,6 +63,8 @@ type Processor struct { WaypointCache cache.WaypointCache locality bpf.LocalityCache + DnsResolverChan chan []*workloadapi.Workload + once sync.Once authzOnce sync.Once @@ -223,7 +224,7 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { nets.CopyIpByteFromSlice(&fk.Ip, ip) fv.UpstreamId = uid if err := p.bpf.FrontendUpdate(&fk, &fv); err != nil { - return fmt.Errorf("Update frontend map failed, err:%s", err) + return fmt.Errorf("update frontend map failed, err:%s", err) } return nil @@ -388,7 +389,7 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na } // addWorkloadToService update service & endpoint bpf map when a workload has new bound services -func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, workloadUid uint32, priority uint32) (error, bpf.EndpointKey) { +func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, workloadUid uint32, priority uint32) (bpf.EndpointKey, error) { var ( ek = bpf.EndpointKey{} ev = bpf.EndpointValue{} @@ -401,14 +402,14 @@ func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValu ev.BackendUid = workloadUid if err := p.bpf.EndpointUpdate(&ek, &ev); err != nil { log.Errorf("Update endpoint map failed, err:%s", err) - return err, ek + return ek, err } p.EndpointCache.AddEndpointToService(cache.Endpoint{ServiceId: ek.ServiceId, Prio: ek.Prio, BackendIndex: ek.BackendIndex}, ev.BackendUid) if err := p.bpf.ServiceUpdate(sk, sv); err != nil { log.Errorf("Update ServiceUpdate map failed, err:%s", err) - return err, ek + return ek, err } - return nil, ek + return ek, nil } // handleWorkloadUnboundServices handles when a workload's belonging services removed @@ -440,7 +441,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { if sv.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // random mode // In random mode, we save all workload to max priority group - if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { + if _, err = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) return err } @@ -448,7 +449,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa service := p.ServiceCache.GetService(p.hashName.NumToStr(svcUid)) if p.locality.LocalityInfo != nil && service != nil { prio := p.locality.CalcLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference()) - if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil { + if _, err = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil { log.Errorf("addWorkloadToService workload %d service %d priority %d failed: %v", workloadId, sk.ServiceId, prio, err) return err } @@ -687,10 +688,10 @@ func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsUpdate []cache.E } // add ek first to another priority group - if err, _ := p.addWorkloadToService(&sKey, &sValue, ev.BackendUid, prio); err != nil { + if _, err := p.addWorkloadToService(&sKey, &sValue, ev.BackendUid, prio); err != nil { return fmt.Errorf("update endpoint %d priority to %d failed: %v", ev.BackendUid, prio, err) } - epKeys := []bpfcache.EndpointKey{ek} + epKeys := []bpf.EndpointKey{ek} // delete ek from old priority group if err := p.deleteEndpointRecords(epKeys); err != nil { return fmt.Errorf("delete endpoint %d from old priority group %d failed: %v", ev.BackendUid, ek.Prio, err) @@ -747,7 +748,7 @@ func (p *Processor) updateServiceMap(service, oldService *workloadapi.Service) e if strings.Contains(serviceName, "waypoint") { newServiceInfo.TargetPort[i] = nets.ConvertPortToBigEndian(KmeshWaypointPort) } else if port.TargetPort == 0 { - // NOTE: Target port could be unset in servicen entry, in which case it should + // NOTE: Target port could be unset in service entry, in which case it should // be consistent with the Service Port. newServiceInfo.TargetPort[i] = nets.ConvertPortToBigEndian(port.ServicePort) } else { @@ -835,7 +836,7 @@ func (p *Processor) handleService(service *workloadapi.Service) error { // Preprocess service, remove the waypoint from waypoint service, otherwise it will fall into a loop in bpf if service.Waypoint != nil && service.GetWaypoint().GetAddress() != nil && len(service.Addresses) != 0 { // Currently istiod only set the waypoint address to the first address of the service - // When waypoints of different granularities are deployed together, the only waypoint service to be determined + // when waypoints of different granularities are deployed together, the only waypoint service to be determined // is whether it contains port 15021, ref: https://github.com/kmesh-net/kmesh/issues/691 // TODO: remove when upstream istiod will not set the waypoint address for itself if slices.Equal(service.GetWaypoint().GetAddress().Address, service.Addresses[0].Address) || containsPort(15021) { @@ -888,12 +889,12 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis // sort resources, first process services, then workload var services []*workloadapi.Service var workloads []*workloadapi.Workload + for _, resource := range rsp.GetResources() { address := &workloadapi.Address{} if err = anypb.UnmarshalTo(resource.Resource, address, proto.UnmarshalOptions{}); err != nil { continue } - switch address.GetType().(type) { case *workloadapi.Address_Workload: workloads = append(workloads, address.GetWorkload()) @@ -933,11 +934,39 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, for _, workload := range workloads { // TODO: Kmesh supports ServiceEntry + // log.Warnf("workload: %s/%s addresses: %v", workload.Namespace, workload.Name, workload.GetAddresses()) if workload.GetAddresses() == nil { - log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) - continue + // Non-ServiceEntry workload with nil addresses will be ignored. + uid := workload.GetUid() + if !strings.Contains(uid, "ServiceEntry") { + log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) + continue + } else { + log.Warnf("workload: %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) + // workload from service entry need address resolving + if p.DnsResolverChan != nil { + p.DnsResolverChan <- workloads + } + go func() { + maxRetries := 30 + for range maxRetries { + workload := p.WorkloadCache.GetWorkloadByUid(uid) + address := workload.GetAddresses() + if address != nil { + log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) + if err := p.handleWorkload(workload); err != nil { + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } + break + } else { + log.Warnf("workload: %s/%s addresses is still nil, retrying...", workload.Namespace, workload.Name) + } + time.Sleep(1 * time.Second) + } + }() + // wait for the service entry to be resolved + } } - if err := p.handleWorkload(workload); err != nil { log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) } @@ -988,11 +1017,11 @@ func (p *Processor) handleRemovedAddressesDuringRestart() { func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error { if rbac == nil { - return fmt.Errorf("Rbac module uninitialized") + return fmt.Errorf("rbac module uninitialized") } // update resource for _, resource := range rsp.GetResources() { - authPolicy := &security.Authorization{} + authPolicy := &security_v2.Authorization{} if err := anypb.UnmarshalTo(resource.Resource, authPolicy, proto.UnmarshalOptions{}); err != nil { log.Errorf("unmarshal failed, err: %v", err) continue From 2ae761bf2fa5fafb1b9bf6d4556d28420643c89a Mon Sep 17 00:00:00 2001 From: Tom Date: Thu, 10 Jul 2025 08:57:15 +0000 Subject: [PATCH 02/14] update DNS refresh rate Signed-off-by: Tom --- pkg/controller/workload/dns.go | 24 +++++++++++++++++-- pkg/controller/workload/workload_processor.go | 23 ++++++++++-------- 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index cce4639ed..c03f3c610 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -1,3 +1,19 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package workload import ( @@ -12,6 +28,10 @@ import ( "kmesh.net/kmesh/pkg/dns" ) +const ( + WorkloadDnsRefreshRate = 200 * time.Millisecond // 200ms, used for workload dns refresh rate +) + type dnsController struct { workloadsChan chan []*workloadapi.Workload cache cache.WorkloadCache @@ -70,7 +90,7 @@ func (r *dnsController) processDomains(workload []*workloadapi.Workload) { r.pendingHostnames[workloadName] = []string{hostname} r.workloadCache[hostname] = &pendingResolveDomain{ Workloads: []*workloadapi.Workload{workload}, - RefreshRate: 15 * time.Second, + RefreshRate: WorkloadDnsRefreshRate, } } @@ -116,7 +136,7 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { isWorkerUpdate := false - if pendingDomain == nil || addrs == nil{ + if pendingDomain == nil || addrs == nil { return } log.Infof("dnsController updateWorkloads: pendingDomain %v, domain %s, addrs %v", pendingDomain, domain, addrs) diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index a04d07492..d457fffdc 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -948,23 +948,26 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, p.DnsResolverChan <- workloads } go func() { - maxRetries := 30 + maxRetries := 50 + var address [][]byte for range maxRetries { workload := p.WorkloadCache.GetWorkloadByUid(uid) - address := workload.GetAddresses() - if address != nil { - log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) - if err := p.handleWorkload(workload); err != nil { - log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) - } + if address = workload.GetAddresses(); address != nil { break } else { - log.Warnf("workload: %s/%s addresses is still nil, retrying...", workload.Namespace, workload.Name) + time.Sleep(WorkloadDnsRefreshRate) } - time.Sleep(1 * time.Second) + } + if address != nil { + log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) + if err := p.handleWorkload(workload); err != nil { + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } + } else { + log.Warnf("workload: %s/%s addresses is still nil after %d retries, skipping", workload.Namespace, workload.Name, maxRetries) } }() - // wait for the service entry to be resolved + } } if err := p.handleWorkload(workload); err != nil { From 210826805873c3cfb83fffb0e472ef0a9c82706f Mon Sep 17 00:00:00 2001 From: Tom Date: Sun, 13 Jul 2025 13:14:06 +0000 Subject: [PATCH 03/14] fix golangci-lint Signed-off-by: Tom --- pkg/controller/workload/dns.go | 7 +++++-- pkg/controller/workload/workload_controller.go | 2 +- pkg/controller/workload/workload_processor.go | 1 - 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index c03f3c610..a4821fefb 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -23,6 +23,7 @@ import ( "time" "google.golang.org/protobuf/proto" + "kmesh.net/kmesh/api/v2/workloadapi" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/dns" @@ -153,7 +154,10 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } if isWorkerUpdate { - // w.cache.Flush() + log.Info("some workloads has been updated") + // TODO: flush the bpf map + // r.cache.Flush() + return } } @@ -206,7 +210,6 @@ func getPendingResolveDomain(workloads []*workloadapi.Workload) map[string]any { if v, ok := domains[hostname]; ok { v.(*pendingResolveDomain).Workloads = append(v.(*pendingResolveDomain).Workloads, workload) } else { - domainWithRefreshRate := &pendingResolveDomain{ Workloads: []*workloadapi.Workload{workload}, RefreshRate: 15 * time.Second, diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 0ce36fdd8..784ce1923 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -160,7 +160,7 @@ func (c *Controller) HandleWorkloadStream() error { return fmt.Errorf("stream recv failed, %s", err) } - // c.dnsResolverController.newWorkloadCache() + c.dnsResolverController.newWorkloadCache() c.Processor.processWorkloadResponse(rspDelta, c.Rbac) if err = c.Stream.Send(c.Processor.ack); err != nil { diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index d457fffdc..6c58cc008 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -967,7 +967,6 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, log.Warnf("workload: %s/%s addresses is still nil after %d retries, skipping", workload.Namespace, workload.Name, maxRetries) } }() - } } if err := p.handleWorkload(workload); err != nil { From 9d82997471ab3c95f70812483158c28bea6fd999 Mon Sep 17 00:00:00 2001 From: Tom Date: Sat, 2 Aug 2025 08:02:25 +0000 Subject: [PATCH 04/14] update workload dnscontroller Signed-off-by: Tom --- pkg/controller/workload/dns.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index a4821fefb..e9089a26f 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -81,18 +81,27 @@ func (r *dnsController) processWorkloads() { } } -func (r *dnsController) processDomains(workload []*workloadapi.Workload) { - domains := getPendingResolveDomain(workload) +func (r *dnsController) processDomains(workloads []*workloadapi.Workload) { + domains := getPendingResolveDomain(workloads) // store all pending hostnames of clusters in pendingHostnames - for _, workload := range workload { + for _, workload := range workloads { workloadName := workload.GetName() hostname := workload.GetHostname() - r.pendingHostnames[workloadName] = []string{hostname} - r.workloadCache[hostname] = &pendingResolveDomain{ - Workloads: []*workloadapi.Workload{workload}, - RefreshRate: WorkloadDnsRefreshRate, + if _, ok := r.pendingHostnames[workloadName]; !ok { + r.pendingHostnames[workloadName] = []string{} + } + r.pendingHostnames[workloadName] = append(r.pendingHostnames[workloadName], hostname) + if _, ok := r.workloadCache[hostname]; !ok { + // Initialize the newly added hostname + r.workloadCache[hostname] = &pendingResolveDomain{ + Workloads: make([]*workloadapi.Workload, 0), + RefreshRate: WorkloadDnsRefreshRate, + } } + r.workloadCache[hostname].Workloads = append( + r.workloadCache[hostname].Workloads, workload, + ) } // delete any scheduled re-resolve for domains we no longer care about @@ -167,7 +176,6 @@ func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, dom addressesOfHostname := make(map[string][]string) for _, hostName := range hostNames { - // log.Infof("overwriteDnsWorkload: checking hostname %s for workload %s with domain %s", hostName, workload.ResourceName(), domain) addresses := r.dnsResolver.GetDNSAddresses(hostName) // There are hostnames in this Cluster that are not resolved. if addresses != nil { From e399e22b4432ca83f8137c3adcb1d98c1b1ab75c Mon Sep 17 00:00:00 2001 From: Tom Date: Tue, 12 Aug 2025 11:40:29 +0000 Subject: [PATCH 05/14] fix up with reviews Signed-off-by: Tom --- README-zh.md | 4 +- pkg/cni/install.go | 4 +- pkg/controller/client.go | 5 +- pkg/controller/controller.go | 3 +- pkg/controller/workload/dns.go | 34 +++++----- .../workload/workload_controller.go | 12 ++-- pkg/controller/workload/workload_processor.go | 62 +++++++++---------- 7 files changed, 55 insertions(+), 69 deletions(-) diff --git a/README-zh.md b/README-zh.md index a628d16a9..24fe6660c 100644 --- a/README-zh.md +++ b/README-zh.md @@ -115,8 +115,8 @@ Kmesh创新性的提出将流量治理下沉OS,在数据路径上无需经过 time="2024-02-19T10:16:53Z" level=info msg="bpf Start successful" subsys=manager time="2024-02-19T10:16:53Z" level=info msg="controller Start successful" subsys=manager time="2024-02-19T10:16:53Z" level=info msg="command StartServer successful" subsys=manager - time="2024-02-19T10:16:53Z" level=info msg="start write CNI config\n" subsys="cni installer" - time="2024-02-19T10:16:53Z" level=info msg="kmesh cni use chained\n" subsys="cni installer" + time="2024-02-19T10:16:53Z" level=info msg="start write CNI config" subsys="cni installer" + time="2024-02-19T10:16:53Z" level=info msg="kmesh cni use chained" subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="Copied /usr/bin/kmesh-cni to /opt/cni/bin." subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="kubeconfig either does not exist or is out of date, writing a new one" subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="wrote kubeconfig file /etc/cni/net.d/kmesh-cni-kubeconfig" subsys="cni installer" diff --git a/pkg/cni/install.go b/pkg/cni/install.go index 0054c1e5f..c57a69b5c 100644 --- a/pkg/cni/install.go +++ b/pkg/cni/install.go @@ -35,13 +35,13 @@ func (i *Installer) addCniConfig() error { if i.CniConfigChained { // "chained" is an cni type // information: www.cni.dev/docs/spec/#overview-1 - log.Infof("kmesh cni use chained\n") + log.Infof("kmesh cni use chained") err = i.chainedKmeshCniPlugin(i.Mode, i.CniMountNetEtcDIR) if err != nil { return err } } else { - log.Error("currently kmesh only support chained cni mode\n") + log.Error("currently kmesh only support chained cni mode") } return nil } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 8ea1b33a4..6f6c9a23a 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -56,9 +56,10 @@ func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWork xdsConfig: config.GetConfig(mode), } - if mode == constants.DualEngineMode { + switch mode { + case constants.DualEngineMode: client.WorkloadController = workload.NewController(bpfWorkload, enableMonitoring, enableProfiling) - } else if mode == constants.KernelNativeMode { + case constants.KernelNativeMode: client.AdsController = ads.NewController(bpfAds) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 65e89f2d4..3f27cb93e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -162,8 +162,7 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) if c.client.WorkloadController != nil { - c.client.WorkloadController.StartDnsController(stopCh) - c.client.WorkloadController.Run(ctx) + c.client.WorkloadController.Run(ctx, stopCh) } else { c.client.AdsController.StartDnsController(stopCh) } diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index e9089a26f..e70d037a4 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -31,6 +31,7 @@ import ( const ( WorkloadDnsRefreshRate = 200 * time.Millisecond // 200ms, used for workload dns refresh rate + maxDNSRefreshRetry = 3 // max retry for dns refresh ) type dnsController struct { @@ -40,7 +41,7 @@ type dnsController struct { // store the copy of pendingResolveWorkload. workloadCache map[string]*pendingResolveDomain // store all pending hostnames in the workloads - pendingHostnames map[string][]string + pendingHostnames map[string]string sync.RWMutex } @@ -56,13 +57,15 @@ func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { if err != nil { return nil, err } - return &dnsController{ + dnsController := &dnsController{ workloadsChan: make(chan []*workloadapi.Workload), cache: cache, dnsResolver: resolver, workloadCache: make(map[string]*pendingResolveDomain), - pendingHostnames: make(map[string][]string), - }, nil + pendingHostnames: make(map[string]string), + } + dnsController.newWorkloadCache() + return dnsController, nil } func (r *dnsController) Run(stopCh <-chan struct{}) { @@ -88,10 +91,7 @@ func (r *dnsController) processDomains(workloads []*workloadapi.Workload) { for _, workload := range workloads { workloadName := workload.GetName() hostname := workload.GetHostname() - if _, ok := r.pendingHostnames[workloadName]; !ok { - r.pendingHostnames[workloadName] = []string{} - } - r.pendingHostnames[workloadName] = append(r.pendingHostnames[workloadName], hostname) + r.pendingHostnames[workloadName] = hostname if _, ok := r.workloadCache[hostname]; !ok { // Initialize the newly added hostname r.workloadCache[hostname] = &pendingResolveDomain{ @@ -133,7 +133,7 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { pendingDomain := r.getWorkloadsByDomain(domain) // log.Infof("dnsController refreshWorker: domain %s, pendingDomain %v", domain, pendingDomain) addrs := r.dnsResolver.GetDNSAddresses(domain) - maxRetry := 3 + maxRetry := maxDNSRefreshRetry for range maxRetry { if len(addrs) > 0 { r.updateWorkloads(pendingDomain, domain, addrs) @@ -172,17 +172,13 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { ready := true - hostNames := r.pendingHostnames[workload.GetName()] + hostName := r.pendingHostnames[workload.GetName()] addressesOfHostname := make(map[string][]string) - for _, hostName := range hostNames { - addresses := r.dnsResolver.GetDNSAddresses(hostName) - // There are hostnames in this Cluster that are not resolved. - if addresses != nil { - addressesOfHostname[hostName] = addresses - } else { - ready = false - } + if addresses := r.dnsResolver.GetDNSAddresses(hostName); addresses != nil { + addressesOfHostname[hostName] = addresses + } else { + ready = false } if ready { @@ -220,7 +216,7 @@ func getPendingResolveDomain(workloads []*workloadapi.Workload) map[string]any { } else { domainWithRefreshRate := &pendingResolveDomain{ Workloads: []*workloadapi.Workload{workload}, - RefreshRate: 15 * time.Second, + RefreshRate: WorkloadDnsRefreshRate, } domains[hostname] = domainWithRefreshRate } diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 784ce1923..697bb5fd7 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -75,7 +75,7 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM return c } -func (c *Controller) Run(ctx context.Context) error { +func (c *Controller) Run(ctx context.Context, stopCh <-chan struct{}) error { if err := c.Processor.PrepareDNSProxy(); err != nil { log.Errorf("failed to prepare for dns proxy, err: %+v", err) return err @@ -102,6 +102,9 @@ func (c *Controller) Run(ctx context.Context) error { if c.OperationMetricController != nil { go c.OperationMetricController.Run(ctx, c.bpfWorkloadObj.SockConn.KmPerfInfo) } + if c.dnsResolverController != nil { + go c.dnsResolverController.Run(stopCh) + } return nil } @@ -160,7 +163,6 @@ func (c *Controller) HandleWorkloadStream() error { return fmt.Errorf("stream recv failed, %s", err) } - c.dnsResolverController.newWorkloadCache() c.Processor.processWorkloadResponse(rspDelta, c.Rbac) if err = c.Stream.Send(c.Processor.ack); err != nil { @@ -207,9 +209,3 @@ func (c *Controller) SetConnectionMetricTrigger(enable bool) { func (c *Controller) GetConnectionMetricTrigger() bool { return c.MetricController.EnableConnectionMetric.Load() } - -func (c *Controller) StartDnsController(stopCh <-chan struct{}) { - if c.dnsResolverController != nil { - c.dnsResolverController.Run(stopCh) - } -} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 6c58cc008..bf5118a08 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -934,43 +934,37 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, for _, workload := range workloads { // TODO: Kmesh supports ServiceEntry - // log.Warnf("workload: %s/%s addresses: %v", workload.Namespace, workload.Name, workload.GetAddresses()) - if workload.GetAddresses() == nil { - // Non-ServiceEntry workload with nil addresses will be ignored. + if workload.GetAddresses() != nil { + if err := p.handleWorkload(workload); err != nil { + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } + } else { + log.Warnf("workload hhh: %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) + // workload from service entry need address resolving + if p.DnsResolverChan != nil { + p.DnsResolverChan <- workloads + } uid := workload.GetUid() - if !strings.Contains(uid, "ServiceEntry") { - log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) - continue - } else { - log.Warnf("workload: %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) - // workload from service entry need address resolving - if p.DnsResolverChan != nil { - p.DnsResolverChan <- workloads - } - go func() { - maxRetries := 50 - var address [][]byte - for range maxRetries { - workload := p.WorkloadCache.GetWorkloadByUid(uid) - if address = workload.GetAddresses(); address != nil { - break - } else { - time.Sleep(WorkloadDnsRefreshRate) - } - } - if address != nil { - log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) - if err := p.handleWorkload(workload); err != nil { - log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) - } + go func() { + maxRetries := 50 + var address [][]byte + for range maxRetries { + workload := p.WorkloadCache.GetWorkloadByUid(uid) + if address = workload.GetAddresses(); address != nil { + break } else { - log.Warnf("workload: %s/%s addresses is still nil after %d retries, skipping", workload.Namespace, workload.Name, maxRetries) + time.Sleep(WorkloadDnsRefreshRate) } - }() - } - } - if err := p.handleWorkload(workload); err != nil { - log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } + if address != nil { + log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) + if err := p.handleWorkload(workload); err != nil { + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } + } else { + log.Warnf("workload: %s/%s addresses is still nil after %d retries, skipping", workload.Namespace, workload.Name, maxRetries) + } + }() } } } From f18663aace3ead8b90180e84a912f25303ba245f Mon Sep 17 00:00:00 2001 From: Tom Date: Fri, 5 Sep 2025 09:05:44 +0000 Subject: [PATCH 06/14] fix panic & use chan to callback Signed-off-by: Tom --- pkg/controller/workload/dns.go | 155 +++++++++--------- .../workload/workload_controller.go | 1 + pkg/controller/workload/workload_processor.go | 42 ++--- 3 files changed, 90 insertions(+), 108 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index e70d037a4..5c543c18e 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -31,13 +31,15 @@ import ( const ( WorkloadDnsRefreshRate = 200 * time.Millisecond // 200ms, used for workload dns refresh rate - maxDNSRefreshRetry = 3 // max retry for dns refresh ) type dnsController struct { - workloadsChan chan []*workloadapi.Workload - cache cache.WorkloadCache - dnsResolver *dns.DNSResolver + cache cache.WorkloadCache + dnsResolver *dns.DNSResolver + + workloadsChan chan *workloadapi.Workload + ResolvedDomainChanMap map[string]chan *workloadapi.Workload + // store the copy of pendingResolveWorkload. workloadCache map[string]*pendingResolveDomain // store all pending hostnames in the workloads @@ -48,7 +50,7 @@ type dnsController struct { // pending resolve domain info of Dual-Engine Mode, // workload is used for create the apiworkload type pendingResolveDomain struct { - Workloads []*workloadapi.Workload + Workload []*workloadapi.Workload RefreshRate time.Duration } @@ -58,11 +60,12 @@ func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { return nil, err } dnsController := &dnsController{ - workloadsChan: make(chan []*workloadapi.Workload), - cache: cache, - dnsResolver: resolver, - workloadCache: make(map[string]*pendingResolveDomain), - pendingHostnames: make(map[string]string), + cache: cache, + dnsResolver: resolver, + workloadsChan: make(chan *workloadapi.Workload), + ResolvedDomainChanMap: make(map[string]chan *workloadapi.Workload), + workloadCache: make(map[string]*pendingResolveDomain), + pendingHostnames: make(map[string]string), } dnsController.newWorkloadCache() return dnsController, nil @@ -71,46 +74,40 @@ func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { func (r *dnsController) Run(stopCh <-chan struct{}) { go r.dnsResolver.StartDnsResolver(stopCh) go r.refreshWorker(stopCh) - go r.processWorkloads() + go func() { + for workload := range r.workloadsChan { + r.processDomains(workload) + } + }() go func() { <-stopCh close(r.workloadsChan) }() } -func (r *dnsController) processWorkloads() { - for workloads := range r.workloadsChan { - r.processDomains(workloads) - } -} - -func (r *dnsController) processDomains(workloads []*workloadapi.Workload) { - domains := getPendingResolveDomain(workloads) - - // store all pending hostnames of clusters in pendingHostnames - for _, workload := range workloads { - workloadName := workload.GetName() - hostname := workload.GetHostname() - r.pendingHostnames[workloadName] = hostname - if _, ok := r.workloadCache[hostname]; !ok { - // Initialize the newly added hostname - r.workloadCache[hostname] = &pendingResolveDomain{ - Workloads: make([]*workloadapi.Workload, 0), - RefreshRate: WorkloadDnsRefreshRate, - } +func (r *dnsController) processDomains(workload *workloadapi.Workload) { + domains := getPendingResolveDomain(workload) + + workloadName := workload.GetName() + hostname := workload.GetHostname() + r.pendingHostnames[workloadName] = hostname + if _, ok := r.workloadCache[hostname]; !ok { + // Initialize the newly added hostname + r.workloadCache[hostname] = &pendingResolveDomain{ + Workload: make([]*workloadapi.Workload, 0), + RefreshRate: WorkloadDnsRefreshRate, } - r.workloadCache[hostname].Workloads = append( - r.workloadCache[hostname].Workloads, workload, - ) } + r.workloadCache[hostname].Workload = append( + r.workloadCache[hostname].Workload, workload, + ) // delete any scheduled re-resolve for domains we no longer care about r.dnsResolver.RemoveUnwatchDomain(domains) // update workloadCache with pendingResolveWorkload for k, v := range domains { - addresses := r.dnsResolver.GetDNSAddresses(k) - if addresses != nil { + if addresses := r.dnsResolver.GetDNSAddresses(k); addresses != nil { go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses) } else { // Initialize the newly added hostname @@ -131,31 +128,37 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { return case domain := <-r.dnsResolver.DnsChan: pendingDomain := r.getWorkloadsByDomain(domain) - // log.Infof("dnsController refreshWorker: domain %s, pendingDomain %v", domain, pendingDomain) addrs := r.dnsResolver.GetDNSAddresses(domain) - maxRetry := maxDNSRefreshRetry - for range maxRetry { - if len(addrs) > 0 { - r.updateWorkloads(pendingDomain, domain, addrs) - } - time.Sleep(1 * time.Second) - } + r.updateWorkloads(pendingDomain, domain, addrs) } } } func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { - isWorkerUpdate := false if pendingDomain == nil || addrs == nil { return } - log.Infof("dnsController updateWorkloads: pendingDomain %v, domain %s, addrs %v", pendingDomain, domain, addrs) + r.Lock() + defer r.Unlock() - for _, workload := range pendingDomain.Workloads { + isWorkerUpdate := false + for _, workload := range pendingDomain.Workload { if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { - // log.Infof("dnsController update cache for workload %s with addresses %v", workload.ResourceName(), addrs) - if r.cache.GetWorkloadByUid(workload.GetUid()) != nil { - r.cache.AddOrUpdateWorkload(newWorkload) + // uid := workload.GetUid() + // if r.cache.GetWorkloadByUid(uid) != nil { + // r.cache.AddOrUpdateWorkload(newWorkload) + // delete(r.workloadCache, domain) + // isWorkerUpdate = true + // } + // if _, ok := r.workloadCache[domain]; !ok { + // continue + // } + uid := newWorkload.GetUid() + if _, ok := r.ResolvedDomainChanMap[uid]; ok { + r.ResolvedDomainChanMap[uid] <- newWorkload + log.Infof("workload %s/%s/%s addresses updated to %v", newWorkload.Namespace, newWorkload.Name, uid, newWorkload.Addresses) + close(r.ResolvedDomainChanMap[uid]) + delete(r.ResolvedDomainChanMap, uid) delete(r.workloadCache, domain) isWorkerUpdate = true } @@ -163,7 +166,7 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } if isWorkerUpdate { - log.Info("some workloads has been updated") + // log.Info("some workloads has been updated") // TODO: flush the bpf map // r.cache.Flush() return @@ -171,55 +174,43 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { - ready := true hostName := r.pendingHostnames[workload.GetName()] - addressesOfHostname := make(map[string][]string) if addresses := r.dnsResolver.GetDNSAddresses(hostName); addresses != nil { - addressesOfHostname[hostName] = addresses - } else { - ready = false - } - - if ready { - // log.Infof("overwriteDnsWorkload ready for workload %s with domain %s", workload.ResourceName(), domain) newWorkload := cloneWorkload(workload) for _, addr := range addrs { if ip := net.ParseIP(addr); ip != nil && ip.To4() != nil { newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) } } - - return ready, newWorkload + return true, newWorkload } - return ready, nil + return false, nil } -func getPendingResolveDomain(workloads []*workloadapi.Workload) map[string]any { +func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { domains := make(map[string]any) - for _, workload := range workloads { - hostname := workload.GetHostname() - if hostname == "" { - continue - } + hostname := workload.GetHostname() + if hostname == "" { + return domains + } - if _, err := netip.ParseAddr(hostname); err == nil { - // This is an ip address - continue - } + if _, err := netip.ParseAddr(hostname); err == nil { + // This is an ip address + return domains + } - // log.Infof("getPendingResolveDomain: processing workload %s with hostname %s", workload.ResourceName(), hostname) - if v, ok := domains[hostname]; ok { - v.(*pendingResolveDomain).Workloads = append(v.(*pendingResolveDomain).Workloads, workload) - } else { - domainWithRefreshRate := &pendingResolveDomain{ - Workloads: []*workloadapi.Workload{workload}, - RefreshRate: WorkloadDnsRefreshRate, - } - domains[hostname] = domainWithRefreshRate + // log.Infof("getPendingResolveDomain: processing workload %s with hostname %s", workload.ResourceName(), hostname) + if v, ok := domains[hostname]; ok { + v.(*pendingResolveDomain).Workload = append(v.(*pendingResolveDomain).Workload, workload) + } else { + domainWithRefreshRate := &pendingResolveDomain{ + Workload: []*workloadapi.Workload{workload}, + RefreshRate: WorkloadDnsRefreshRate, } + domains[hostname] = domainWithRefreshRate } return domains diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 697bb5fd7..5c5907e94 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -56,6 +56,7 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM return nil } processor.DnsResolverChan = dnsResolverController.workloadsChan + processor.ResolvedDomainChanMap = dnsResolverController.ResolvedDomainChanMap c := &Controller{ dnsResolverController: dnsResolverController, Processor: processor, diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index bf5118a08..b0316480a 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -23,7 +23,6 @@ import ( "sort" "strings" "sync" - "time" service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/proto" @@ -63,7 +62,8 @@ type Processor struct { WaypointCache cache.WaypointCache locality bpf.LocalityCache - DnsResolverChan chan []*workloadapi.Workload + DnsResolverChan chan *workloadapi.Workload + ResolvedDomainChanMap map[string]chan *workloadapi.Workload once sync.Once authzOnce sync.Once @@ -591,9 +591,8 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { // Because there is only one address in the workload, a direct comparison can be made to // determine whether the old data needs to be deleted or not. - if !slices.Equal(newWorkloadAddresses[0], oldWorkloadAddresses[0]) { - err := p.deleteFrontendByIp(oldWorkloadAddresses) - if err != nil { + if len(newWorkloadAddresses) > 0 && len(oldWorkloadAddresses) > 0 && !slices.Equal(newWorkloadAddresses[0], oldWorkloadAddresses[0]) { + if err := p.deleteFrontendByIp(oldWorkloadAddresses); err != nil { return fmt.Errorf("frontend map delete failed: %v", err) } } @@ -939,32 +938,23 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) } } else { - log.Warnf("workload hhh: %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) + log.Warnf("workload %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) // workload from service entry need address resolving if p.DnsResolverChan != nil { - p.DnsResolverChan <- workloads - } - uid := workload.GetUid() - go func() { - maxRetries := 50 - var address [][]byte - for range maxRetries { - workload := p.WorkloadCache.GetWorkloadByUid(uid) - if address = workload.GetAddresses(); address != nil { - break - } else { - time.Sleep(WorkloadDnsRefreshRate) - } - } - if address != nil { - log.Infof("workload: %s/%s addresses resolved: %v", workload.Namespace, workload.Name, address) - if err := p.handleWorkload(workload); err != nil { - log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + uid := workload.GetUid() + p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) + p.DnsResolverChan <- workload + log.Infof("wait for workload %s/%s/%s address resolving", workload.Namespace, workload.Name, uid) + newWorkload := <-p.ResolvedDomainChanMap[uid] + if address := newWorkload.GetAddresses(); address != nil { + log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) + if err := p.handleWorkload(newWorkload); err != nil { + log.Errorf("handle workload %s failed, err: %v", newWorkload.ResourceName(), err) } } else { - log.Warnf("workload: %s/%s addresses is still nil after %d retries, skipping", workload.Namespace, workload.Name, maxRetries) + log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) } - }() + } } } } From 82a45093b0502e4ff95bf938f2123735e910fe70 Mon Sep 17 00:00:00 2001 From: Tom Date: Sat, 6 Sep 2025 05:29:10 +0000 Subject: [PATCH 07/14] fix merge conflict Signed-off-by: Tom --- pkg/controller/controller.go | 4 +- .../workload/workload_controller.go | 2 +- pkg/controller/workload/workload_processor.go | 46 +++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3f27cb93e..4e5bf1b2d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -162,7 +162,9 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) if c.client.WorkloadController != nil { - c.client.WorkloadController.Run(ctx, stopCh) + if err := c.client.WorkloadController.Run(ctx, stopCh); err != nil { + return fmt.Errorf("failed to start workload controller: %+v", err) + } } else { c.client.AdsController.StartDnsController(stopCh) } diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 5c5907e94..db7b08611 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -40,12 +40,12 @@ var log = logger.NewLoggerScope("workload_controller") type Controller struct { Stream discoveryv3.AggregatedDiscoveryService_DeltaAggregatedResourcesClient Processor *Processor - dnsResolverController *dnsController Rbac *auth.Rbac MetricController *telemetry.MetricController MapMetricController *telemetry.MapMetricController OperationMetricController *telemetry.BpfProgMetric bpfWorkloadObj *bpfwl.BpfWorkload + dnsResolverController *dnsController } func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) *Controller { diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index b0316480a..50738137e 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -62,9 +62,6 @@ type Processor struct { WaypointCache cache.WaypointCache locality bpf.LocalityCache - DnsResolverChan chan *workloadapi.Workload - ResolvedDomainChanMap map[string]chan *workloadapi.Workload - once sync.Once authzOnce sync.Once @@ -75,6 +72,9 @@ type Processor struct { authzRespOnce sync.Once handlers map[string][]func(resp *service_discovery_v3.DeltaDiscoveryResponse) error + + DnsResolverChan chan *workloadapi.Workload + ResolvedDomainChanMap map[string]chan *workloadapi.Workload } func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { @@ -592,7 +592,8 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { // Because there is only one address in the workload, a direct comparison can be made to // determine whether the old data needs to be deleted or not. if len(newWorkloadAddresses) > 0 && len(oldWorkloadAddresses) > 0 && !slices.Equal(newWorkloadAddresses[0], oldWorkloadAddresses[0]) { - if err := p.deleteFrontendByIp(oldWorkloadAddresses); err != nil { + err := p.deleteFrontendByIp(oldWorkloadAddresses) + if err != nil { return fmt.Errorf("frontend map delete failed: %v", err) } } @@ -933,29 +934,26 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, for _, workload := range workloads { // TODO: Kmesh supports ServiceEntry - if workload.GetAddresses() != nil { - if err := p.handleWorkload(workload); err != nil { - log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + if workload.GetAddresses() == nil { + log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) + if p.DnsResolverChan == nil { + continue } - } else { - log.Warnf("workload %s/%s addresses is nil, workload info: %+v", workload.Namespace, workload.Name, workload) - // workload from service entry need address resolving - if p.DnsResolverChan != nil { - uid := workload.GetUid() - p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) - p.DnsResolverChan <- workload - log.Infof("wait for workload %s/%s/%s address resolving", workload.Namespace, workload.Name, uid) - newWorkload := <-p.ResolvedDomainChanMap[uid] - if address := newWorkload.GetAddresses(); address != nil { - log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) - if err := p.handleWorkload(newWorkload); err != nil { - log.Errorf("handle workload %s failed, err: %v", newWorkload.ResourceName(), err) - } - } else { - log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) - } + uid := workload.GetUid() + p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) + p.DnsResolverChan <- workload + log.Infof("wait for workload %s/%s/%s address resolving", workload.Namespace, workload.Name, uid) + newWorkload := <-p.ResolvedDomainChanMap[uid] + if address := newWorkload.GetAddresses(); address == nil { + log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) + continue + } else { + log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) } } + if err := p.handleWorkload(workload); err != nil { + log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) + } } } From aa706b5b8ee01d7133a021529a3437f9d89b9e48 Mon Sep 17 00:00:00 2001 From: Tom Date: Mon, 8 Sep 2025 09:51:04 +0000 Subject: [PATCH 08/14] fix e2e test Signed-off-by: Tom --- pkg/controller/workload/dns.go | 1 + .../workload/workload_controller.go | 2 +- pkg/controller/workload/workload_processor.go | 19 +++++++++++++------ 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index 5c543c18e..c043b281f 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -155,6 +155,7 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom // } uid := newWorkload.GetUid() if _, ok := r.ResolvedDomainChanMap[uid]; ok { + r.cache.AddOrUpdateWorkload(newWorkload) r.ResolvedDomainChanMap[uid] <- newWorkload log.Infof("workload %s/%s/%s addresses updated to %v", newWorkload.Namespace, newWorkload.Name, uid, newWorkload.Addresses) close(r.ResolvedDomainChanMap[uid]) diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index db7b08611..4ed6e9bd1 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -58,9 +58,9 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM processor.DnsResolverChan = dnsResolverController.workloadsChan processor.ResolvedDomainChanMap = dnsResolverController.ResolvedDomainChanMap c := &Controller{ - dnsResolverController: dnsResolverController, Processor: processor, bpfWorkloadObj: bpfWorkload, + dnsResolverController: dnsResolverController, } // do some initialization when restart // restore endpoint index, otherwise endpoint number can double diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 50738137e..0994e1e37 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -23,6 +23,7 @@ import ( "sort" "strings" "sync" + "time" service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/proto" @@ -933,7 +934,6 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, } for _, workload := range workloads { - // TODO: Kmesh supports ServiceEntry if workload.GetAddresses() == nil { log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) if p.DnsResolverChan == nil { @@ -943,14 +943,21 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) p.DnsResolverChan <- workload log.Infof("wait for workload %s/%s/%s address resolving", workload.Namespace, workload.Name, uid) - newWorkload := <-p.ResolvedDomainChanMap[uid] - if address := newWorkload.GetAddresses(); address == nil { - log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) + select { + case <-time.After(3 * time.Second): + log.Warnf("address resolving for workload %s/%s/%s timeout, skip handling", workload.Namespace, workload.Name, uid) continue - } else { - log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) + case newWorkload := <-p.ResolvedDomainChanMap[uid]: + if address := newWorkload.GetAddresses(); address == nil { + log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) + continue + } else { + log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) + workload = newWorkload + } } } + if err := p.handleWorkload(workload); err != nil { log.Errorf("handle workload %s failed, err: %v", workload.ResourceName(), err) } From 44af06306a1204822ef2c68e386a771ab5a1abab Mon Sep 17 00:00:00 2001 From: Tom Date: Mon, 29 Sep 2025 14:24:41 +0000 Subject: [PATCH 09/14] add UT for DNS workload processing and resolution Signed-off-by: Tom --- .gitignore | 1 + pkg/controller/workload/dns_test.go | 287 ++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+) create mode 100644 pkg/controller/workload/dns_test.go diff --git a/.gitignore b/.gitignore index 55e6e051d..15bb9f4d6 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,4 @@ oncn-mda/build/ oncn-mda/deploy/ test/mugen-master/ +.github/copilot-instructions.md diff --git a/pkg/controller/workload/dns_test.go b/pkg/controller/workload/dns_test.go new file mode 100644 index 000000000..258f411b0 --- /dev/null +++ b/pkg/controller/workload/dns_test.go @@ -0,0 +1,287 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workload + +import ( + "net/netip" + "reflect" + "testing" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/stretchr/testify/assert" + "istio.io/istio/pkg/slices" + "istio.io/istio/pkg/test/util/retry" + + "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" + "kmesh.net/kmesh/pkg/dns" +) + +func TestOverwriteDnsWorkload(t *testing.T) { + domain := "example.com" + addrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} + workload := &workloadapi.Workload{ + Uid: "test-uid", + Name: "test-workload", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + p.DnsResolverChan = dnsController.workloadsChan + + dnsController.pendingHostnames = map[string]string{ + workload.GetName(): domain, + } + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "GetDNSAddresses", + func(_ *dns.DNSResolver, name string) []string { + return addrs + }) + + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.Equal(t, true, ready) + + if ready { + // Verify only IPv4 addresses are added based on current filtering logic + expectedAddrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) + } + assert.Equal(t, expectedAddrs, actualAddrs) + } +} + +func TestHandleWorkloadsWithDns(t *testing.T) { + workload1 := &workloadapi.Workload{ + Uid: "test-uid-1", + Name: "test-workload-1", + Hostname: "foo.bar", + } + workload2 := &workloadapi.Workload{ + Uid: "test-uid-2", + Name: "test-workload-2", + Hostname: "foo.baz", + Addresses: [][]byte{netip.MustParseAddr("192.168.1.1").AsSlice()}, + } + + testcases := []struct { + name string + workloads []*workloadapi.Workload + expected []string + }{ + { + name: "add workloads with DNS hostname", + workloads: []*workloadapi.Workload{workload1, workload2}, + expected: []string{"foo.bar"}, + }, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + dnsController.Run(stopCh) + p.DnsResolverChan = dnsController.workloadsChan + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Send workloads that need DNS resolution + for _, wl := range tc.workloads { + if wl.GetAddresses() == nil { + dnsController.workloadsChan <- wl + } + } + + // Verify pending domains are correct + retry.UntilOrFail(t, func() bool { + domains := make([]string, 0) + for _, wl := range tc.workloads { + if wl.GetAddresses() == nil { + result := getPendingResolveDomain(wl) + for domain := range result { + domains = append(domains, domain) + } + } + } + return slices.EqualUnordered(tc.expected, domains) + }, retry.Timeout(1*time.Second)) + }) + } +} + +func TestGetPendingResolveDomain(t *testing.T) { + tests := []struct { + name string + workload *workloadapi.Workload + expected []string + }{ + { + name: "valid hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-1", + Name: "test-workload", + Hostname: "example.com", + }, + expected: []string{"example.com"}, + }, + { + name: "empty hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-2", + Name: "test-workload-2", + Hostname: "", + }, + expected: []string{}, + }, + { + name: "ip address as hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-3", + Name: "test-workload-3", + Hostname: "192.168.1.1", + }, + expected: []string{}, + }, + { + name: "ipv6 address as hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-4", + Name: "test-workload-4", + Hostname: "2001:db8::1", + }, + expected: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getPendingResolveDomain(tt.workload) + domains := make([]string, 0, len(result)) + for domain := range result { + domains = append(domains, domain) + } + assert.True(t, slices.EqualUnordered(tt.expected, domains)) + }) + } +} + +func TestDnsController_ProcessDomains(t *testing.T) { + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + + workload := &workloadapi.Workload{ + Uid: "test-uid", + Name: "test-workload", + Hostname: "example.com", + } + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "GetDNSAddresses", + func(_ *dns.DNSResolver, domain string) []string { + return nil // Mock initial cache miss scenario + }) + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "RemoveUnwatchDomain", + func(_ *dns.DNSResolver, domains map[string]any) { + // Mock domain unwatch removal + }) + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "AddDomainInQueue", + func(_ *dns.DNSResolver, domainInfo *dns.DomainInfo, delay time.Duration) { + // Mock adding domain to queue + }) + + // Test domain processing + dnsController.processDomains(workload) + + // Verify workloadCache is updated + assert.Contains(t, dnsController.workloadCache, "example.com") + assert.Contains(t, dnsController.pendingHostnames, "test-workload") + assert.Equal(t, "example.com", dnsController.pendingHostnames["test-workload"]) + + pendingDomain := dnsController.workloadCache["example.com"] + assert.NotNil(t, pendingDomain) + assert.Equal(t, 1, len(pendingDomain.Workload)) + assert.Equal(t, workload, pendingDomain.Workload[0]) + assert.Equal(t, WorkloadDnsRefreshRate, pendingDomain.RefreshRate) +} + +func TestCloneWorkload(t *testing.T) { + tests := []struct { + name string + workload *workloadapi.Workload + wantNil bool + }{ + { + name: "nil workload", + workload: nil, + wantNil: true, + }, + { + name: "valid workload", + workload: &workloadapi.Workload{ + Uid: "test-uid", + Name: "test-workload", + Hostname: "example.com", + Addresses: [][]byte{ + netip.MustParseAddr("192.168.1.1").AsSlice(), + }, + }, + wantNil: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := cloneWorkload(tt.workload) + + if tt.wantNil { + assert.Nil(t, result) + } else { + assert.NotNil(t, result) + assert.Equal(t, tt.workload.Uid, result.Uid) + assert.Equal(t, tt.workload.Name, result.Name) + assert.Equal(t, tt.workload.Hostname, result.Hostname) + assert.Equal(t, len(tt.workload.Addresses), len(result.Addresses)) + + // Verify different object instances + assert.NotSame(t, tt.workload, result) + } + }) + } +} From a3bcb6ab2804d1af27be7fd2fa943647e34e6387 Mon Sep 17 00:00:00 2001 From: Tom Date: Tue, 14 Oct 2025 10:38:03 +0000 Subject: [PATCH 10/14] add e2e test Signed-off-by: Tom --- test/e2e/baseline_test.go | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/test/e2e/baseline_test.go b/test/e2e/baseline_test.go index 5b5b2431b..c507b40ce 100644 --- a/test/e2e/baseline_test.go +++ b/test/e2e/baseline_test.go @@ -236,6 +236,70 @@ spec: }) } +// TestServiceEntryDNSResolution tests accessing a service through a fake hostname configured in ServiceEntry. +// This simulates external service access using DNS resolution. +func TestServiceEntryDNSResolution(t *testing.T) { + runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { + // Only test HTTP traffic + if opt.Scheme != scheme.HTTP { + return + } + // Need waypoint proxy for L7 processing + if !dst.Config().HasServiceAddressedWaypointProxy() { + return + } + // Skip IPv6 for now as it's not fully supported + if net.ParseIP(dst.Address()).To4() == nil { + return + } + + const ( + serviceEntryName = "external-svc-dns" + fakeHostname = "foo.bar.com" + ) + + // Use enrolled-to-kmesh service as the actual backend for ServiceEntry + // to avoid circular reference when dst is the service-with-waypoint itself + backendService := apps.EnrolledToKmesh[0].Config().Service + "." + apps.Namespace.Name() + ".svc.cluster.local" + servicePort := dst.Config().Ports.MustForName("http").ServicePort + + // Create ServiceEntry with fake hostname pointing to enrolled-to-kmesh service via DNS resolution + t.ConfigIstio().Eval(apps.Namespace.Name(), map[string]string{ + "ServiceEntryName": serviceEntryName, + "FakeHostname": fakeHostname, + "Backend": backendService, + "ServicePort": fmt.Sprintf("%d", servicePort), + }, `apiVersion: networking.istio.io/v1alpha3 +kind: ServiceEntry +metadata: + name: "{{.ServiceEntryName}}" +spec: + hosts: + - "{{.FakeHostname}}" + location: MESH_EXTERNAL + ports: + - number: {{.ServicePort}} + name: http + protocol: HTTP + resolution: DNS + endpoints: + - address: "{{.Backend}}" +`).ApplyOrFail(t) + + // Set waypoint for the ServiceEntry + SetWaypoint(t, apps.Namespace.Name(), serviceEntryName, "waypoint", ServiceEntry) + + // Test accessing the backend service through the fake hostname + opt = opt.DeepCopy() + opt.Count = 5 + opt.Timeout = time.Second * 10 + opt.Address = fakeHostname + opt.Port = echo.Port{ServicePort: servicePort} + opt.Check = check.OK() + src.CallOrFail(t, opt) + }) +} + func TestTrafficSplit(t *testing.T) { runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { // Need at least one waypoint proxy and HTTP From 132471200ab97ebc533029edd92b90c215dea968 Mon Sep 17 00:00:00 2001 From: Tom Date: Tue, 14 Oct 2025 14:36:56 +0000 Subject: [PATCH 11/14] fix with reviews Signed-off-by: Tom --- pkg/controller/workload/dns.go | 75 +++++++++----- pkg/controller/workload/dns_test.go | 98 ++++++++++++++----- pkg/controller/workload/workload_processor.go | 5 + 3 files changed, 134 insertions(+), 44 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index c043b281f..7106be1f8 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -86,10 +86,21 @@ func (r *dnsController) Run(stopCh <-chan struct{}) { } func (r *dnsController) processDomains(workload *workloadapi.Workload) { + if workload == nil { + log.Warn("received nil workload in processDomains") + return + } + domains := getPendingResolveDomain(workload) + if len(domains) == 0 { + log.Debugf("no domains to resolve for workload %s/%s", workload.Namespace, workload.Name) + return + } workloadName := workload.GetName() hostname := workload.GetHostname() + + r.Lock() r.pendingHostnames[workloadName] = hostname if _, ok := r.workloadCache[hostname]; !ok { // Initialize the newly added hostname @@ -97,10 +108,12 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { Workload: make([]*workloadapi.Workload, 0), RefreshRate: WorkloadDnsRefreshRate, } + log.Debugf("initialized DNS cache for hostname %s", hostname) } r.workloadCache[hostname].Workload = append( r.workloadCache[hostname].Workload, workload, ) + r.Unlock() // delete any scheduled re-resolve for domains we no longer care about r.dnsResolver.RemoveUnwatchDomain(domains) @@ -108,6 +121,7 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { // update workloadCache with pendingResolveWorkload for k, v := range domains { if addresses := r.dnsResolver.GetDNSAddresses(k); addresses != nil { + log.Debugf("found cached DNS addresses for domain %s: %v", k, addresses) go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses) } else { // Initialize the newly added hostname @@ -116,6 +130,7 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { Domain: k, RefreshRate: v.(*pendingResolveDomain).RefreshRate, } + log.Infof("adding domain %s to DNS resolution queue", k) r.dnsResolver.AddDomainInQueue(domainInfo, 0) } } @@ -125,10 +140,24 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { for { select { case <-stop: + log.Info("DNS refresh worker stopped") return case domain := <-r.dnsResolver.DnsChan: + if domain == "" { + log.Warn("received empty domain in refresh worker") + continue + } pendingDomain := r.getWorkloadsByDomain(domain) + if pendingDomain == nil { + log.Debugf("no pending workloads found for domain %s", domain) + continue + } addrs := r.dnsResolver.GetDNSAddresses(domain) + if len(addrs) == 0 { + log.Warnf("no DNS addresses found for domain %s", domain) + continue + } + log.Debugf("refreshing workloads for domain %s with addresses %v", domain, addrs) r.updateWorkloads(pendingDomain, domain, addrs) } } @@ -144,15 +173,6 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom isWorkerUpdate := false for _, workload := range pendingDomain.Workload { if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { - // uid := workload.GetUid() - // if r.cache.GetWorkloadByUid(uid) != nil { - // r.cache.AddOrUpdateWorkload(newWorkload) - // delete(r.workloadCache, domain) - // isWorkerUpdate = true - // } - // if _, ok := r.workloadCache[domain]; !ok { - // continue - // } uid := newWorkload.GetUid() if _, ok := r.ResolvedDomainChanMap[uid]; ok { r.cache.AddOrUpdateWorkload(newWorkload) @@ -167,27 +187,39 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } if isWorkerUpdate { - // log.Info("some workloads has been updated") - // TODO: flush the bpf map - // r.cache.Flush() + // TODO: flush the bpf map if needed return } } func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { - hostName := r.pendingHostnames[workload.GetName()] + // Verify the domain matches the workload's hostname + if workload.GetHostname() != domain { + log.Warnf("domain mismatch: workload hostname %s != domain %s", workload.GetHostname(), domain) + return false, nil + } - if addresses := r.dnsResolver.GetDNSAddresses(hostName); addresses != nil { - newWorkload := cloneWorkload(workload) - for _, addr := range addrs { - if ip := net.ParseIP(addr); ip != nil && ip.To4() != nil { - newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) - } + if len(addrs) == 0 { + log.Warnf("no addresses provided for domain %s", domain) + return false, nil + } + + newWorkload := cloneWorkload(workload) + for _, addr := range addrs { + if ip := net.ParseIP(addr); ip != nil { + // Support both IPv4 and IPv6 addresses + newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) + } else { + log.Warnf("invalid IP address: %s for domain %s", addr, domain) } - return true, newWorkload } - return false, nil + if len(newWorkload.Addresses) == 0 { + log.Warnf("no valid addresses after parsing for domain %s", domain) + return false, nil + } + + return true, newWorkload } func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { @@ -203,7 +235,6 @@ func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { return domains } - // log.Infof("getPendingResolveDomain: processing workload %s with hostname %s", workload.ResourceName(), hostname) if v, ok := domains[hostname]; ok { v.(*pendingResolveDomain).Workload = append(v.(*pendingResolveDomain).Workload, workload) } else { diff --git a/pkg/controller/workload/dns_test.go b/pkg/controller/workload/dns_test.go index 258f411b0..7ef5fef48 100644 --- a/pkg/controller/workload/dns_test.go +++ b/pkg/controller/workload/dns_test.go @@ -32,12 +32,13 @@ import ( "kmesh.net/kmesh/pkg/dns" ) -func TestOverwriteDnsWorkload(t *testing.T) { +func TestOverwriteDnsWorkloadWithIPv4(t *testing.T) { domain := "example.com" + // Test with only IPv4 addresses addrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} workload := &workloadapi.Workload{ - Uid: "test-uid", - Name: "test-workload", + Uid: "test-uid-ipv4", + Name: "test-workload-ipv4", Hostname: domain, } @@ -52,30 +53,83 @@ func TestOverwriteDnsWorkload(t *testing.T) { assert.NoError(t, err) p.DnsResolverChan = dnsController.workloadsChan - dnsController.pendingHostnames = map[string]string{ - workload.GetName(): domain, + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.True(t, ready, "should successfully process IPv4 addresses") + + // Verify all IPv4 addresses are correctly added + expectedAddrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) } + assert.Equal(t, expectedAddrs, actualAddrs) +} - patches := gomonkey.NewPatches() - defer patches.Reset() - patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "GetDNSAddresses", - func(_ *dns.DNSResolver, name string) []string { - return addrs - }) +func TestOverwriteDnsWorkloadWithIPv6(t *testing.T) { + domain := "example.com" + // Test with only IPv6 addresses + addrs := []string{"2001:db8::1", "fe80::1", "2001:db8::2"} + workload := &workloadapi.Workload{ + Uid: "test-uid-ipv6", + Name: "test-workload-ipv6", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.True(t, ready, "should successfully process IPv6 addresses") + + // Verify all IPv6 addresses are correctly added + expectedAddrs := []string{"2001:db8::1", "fe80::1", "2001:db8::2"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) + } + assert.Equal(t, expectedAddrs, actualAddrs) +} + +func TestOverwriteDnsWorkloadWithMixedAddresses(t *testing.T) { + domain := "example.com" + // Test with both IPv4 and IPv6 addresses mixed + addrs := []string{"192.168.1.1", "2001:db8::1", "10.0.0.1", "fe80::1"} + workload := &workloadapi.Workload{ + Uid: "test-uid-mixed", + Name: "test-workload-mixed", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) - assert.Equal(t, true, ready) - - if ready { - // Verify only IPv4 addresses are added based on current filtering logic - expectedAddrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} - actualAddrs := make([]string, 0, len(newWorkload.Addresses)) - for _, addr := range newWorkload.Addresses { - ip, _ := netip.AddrFromSlice(addr) - actualAddrs = append(actualAddrs, ip.String()) - } - assert.Equal(t, expectedAddrs, actualAddrs) + assert.True(t, ready, "should successfully process mixed IPv4 and IPv6 addresses") + + // Verify both IPv4 and IPv6 addresses are correctly added + expectedAddrs := []string{"192.168.1.1", "2001:db8::1", "10.0.0.1", "fe80::1"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) } + assert.Equal(t, expectedAddrs, actualAddrs) } func TestHandleWorkloadsWithDns(t *testing.T) { diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 0994e1e37..cf89814e9 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -946,6 +946,11 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, select { case <-time.After(3 * time.Second): log.Warnf("address resolving for workload %s/%s/%s timeout, skip handling", workload.Namespace, workload.Name, uid) + // Cleanup to prevent memory leak + if ch, ok := p.ResolvedDomainChanMap[uid]; ok { + close(ch) + delete(p.ResolvedDomainChanMap, uid) + } continue case newWorkload := <-p.ResolvedDomainChanMap[uid]: if address := newWorkload.GetAddresses(); address == nil { From 9fce1d68dd3c1f9443b8c787b0e0e2bb14da914b Mon Sep 17 00:00:00 2001 From: Tom Date: Tue, 14 Oct 2025 15:58:01 +0000 Subject: [PATCH 12/14] refactor: enhance DNS workload management and cleanup on deletion Signed-off-by: Tom --- .githooks/pre-commit | 33 +++++ pkg/controller/workload/dns.go | 115 +++++++++++------- .../workload/workload_controller.go | 6 + pkg/controller/workload/workload_processor.go | 8 ++ 4 files changed, 120 insertions(+), 42 deletions(-) create mode 100755 .githooks/pre-commit diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 000000000..700534ad4 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,33 @@ +#!/bin/bash +# +# Kmesh pre-commit hook +# This hook runs 'make clean' and 'make gen-check' before each commit +# to ensure generated files are up-to-date and temporary files are cleaned up. +# + +set -e + +echo "Running pre-commit checks..." + +# Change to repository root +REPO_ROOT=$(git rev-parse --show-toplevel) +cd "$REPO_ROOT" + +# Run make clean to restore auto-generated files +echo "→ Running 'make clean'..." +if ! make clean; then + echo "❌ 'make clean' failed" + exit 1 +fi + +# Run make gen-check to verify generated files are up-to-date +echo "→ Running 'make gen-check'..." +if ! make gen-check; then + echo "❌ 'make gen-check' failed" + echo "" + echo "Generated files are out of date. Please run 'make gen' and commit the changes." + exit 1 +fi + +echo "✅ Pre-commit checks passed" +exit 0 diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index 7106be1f8..4d98439f8 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -30,7 +30,8 @@ import ( ) const ( - WorkloadDnsRefreshRate = 200 * time.Millisecond // 200ms, used for workload dns refresh rate + WorkloadDnsRefreshRate = 200 * time.Millisecond // DNS refresh rate for workloads + WorkloadChannelSendTimeout = 100 * time.Millisecond // Timeout for sending to workload channel ) type dnsController struct { @@ -40,15 +41,12 @@ type dnsController struct { workloadsChan chan *workloadapi.Workload ResolvedDomainChanMap map[string]chan *workloadapi.Workload - // store the copy of pendingResolveWorkload. - workloadCache map[string]*pendingResolveDomain - // store all pending hostnames in the workloads - pendingHostnames map[string]string + workloadCache map[string]*pendingResolveDomain // hostname -> pending workloads + pendingHostnames map[string]string // workload name -> hostname sync.RWMutex } -// pending resolve domain info of Dual-Engine Mode, -// workload is used for create the apiworkload +// pendingResolveDomain stores workloads pending DNS resolution for a domain type pendingResolveDomain struct { Workload []*workloadapi.Workload RefreshRate time.Duration @@ -67,7 +65,6 @@ func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { workloadCache: make(map[string]*pendingResolveDomain), pendingHostnames: make(map[string]string), } - dnsController.newWorkloadCache() return dnsController, nil } @@ -85,6 +82,8 @@ func (r *dnsController) Run(stopCh <-chan struct{}) { }() } +// processDomains processes workloads requiring DNS resolution. +// Thread-safe: uses locks to protect shared data structures. func (r *dnsController) processDomains(workload *workloadapi.Workload) { if workload == nil { log.Warn("received nil workload in processDomains") @@ -93,7 +92,6 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { domains := getPendingResolveDomain(workload) if len(domains) == 0 { - log.Debugf("no domains to resolve for workload %s/%s", workload.Namespace, workload.Name) return } @@ -103,29 +101,22 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { r.Lock() r.pendingHostnames[workloadName] = hostname if _, ok := r.workloadCache[hostname]; !ok { - // Initialize the newly added hostname r.workloadCache[hostname] = &pendingResolveDomain{ Workload: make([]*workloadapi.Workload, 0), RefreshRate: WorkloadDnsRefreshRate, } - log.Debugf("initialized DNS cache for hostname %s", hostname) } r.workloadCache[hostname].Workload = append( r.workloadCache[hostname].Workload, workload, ) r.Unlock() - // delete any scheduled re-resolve for domains we no longer care about r.dnsResolver.RemoveUnwatchDomain(domains) - // update workloadCache with pendingResolveWorkload for k, v := range domains { if addresses := r.dnsResolver.GetDNSAddresses(k); addresses != nil { - log.Debugf("found cached DNS addresses for domain %s: %v", k, addresses) go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses) } else { - // Initialize the newly added hostname - // and add it to the dns queue to be resolved. domainInfo := &dns.DomainInfo{ Domain: k, RefreshRate: v.(*pendingResolveDomain).RefreshRate, @@ -144,78 +135,100 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { return case domain := <-r.dnsResolver.DnsChan: if domain == "" { - log.Warn("received empty domain in refresh worker") continue } + pendingDomain := r.getWorkloadsByDomain(domain) if pendingDomain == nil { - log.Debugf("no pending workloads found for domain %s", domain) continue } + addrs := r.dnsResolver.GetDNSAddresses(domain) if len(addrs) == 0 { log.Warnf("no DNS addresses found for domain %s", domain) continue } - log.Debugf("refreshing workloads for domain %s with addresses %v", domain, addrs) + r.updateWorkloads(pendingDomain, domain, addrs) } } } +// updateWorkloads processes DNS resolution results and updates workloads. +// Uses fine-grained locking to avoid deadlocks. Supports both IPv4 and IPv6. func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { if pendingDomain == nil || addrs == nil { return } - r.Lock() - defer r.Unlock() - isWorkerUpdate := false + // Process workloads without holding the lock + var readyWorkloads []*workloadapi.Workload for _, workload := range pendingDomain.Workload { if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { - uid := newWorkload.GetUid() - if _, ok := r.ResolvedDomainChanMap[uid]; ok { - r.cache.AddOrUpdateWorkload(newWorkload) - r.ResolvedDomainChanMap[uid] <- newWorkload - log.Infof("workload %s/%s/%s addresses updated to %v", newWorkload.Namespace, newWorkload.Name, uid, newWorkload.Addresses) + readyWorkloads = append(readyWorkloads, newWorkload) + } + } + + // Send to channels without holding lock to prevent deadlock + for _, newWorkload := range readyWorkloads { + uid := newWorkload.GetUid() + + r.Lock() + ch, ok := r.ResolvedDomainChanMap[uid] + r.Unlock() + + if ok { + r.cache.AddOrUpdateWorkload(newWorkload) + select { + case ch <- newWorkload: + log.Infof("workload %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, newWorkload.Addresses) + case <-time.After(WorkloadChannelSendTimeout): + log.Warnf("timeout sending resolved workload %s/%s", newWorkload.Namespace, newWorkload.Name) + } + + r.Lock() + if _, stillExists := r.ResolvedDomainChanMap[uid]; stillExists { close(r.ResolvedDomainChanMap[uid]) delete(r.ResolvedDomainChanMap, uid) - delete(r.workloadCache, domain) - isWorkerUpdate = true } + r.Unlock() } } - if isWorkerUpdate { - // TODO: flush the bpf map if needed - return + // Clean up domain cache + if len(readyWorkloads) > 0 { + r.Lock() + delete(r.workloadCache, domain) + r.Unlock() } } +// overwriteDnsWorkload creates a new workload with resolved IP addresses. +// Supports both IPv4 and IPv6 addresses. func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { - // Verify the domain matches the workload's hostname if workload.GetHostname() != domain { log.Warnf("domain mismatch: workload hostname %s != domain %s", workload.GetHostname(), domain) return false, nil } if len(addrs) == 0 { - log.Warnf("no addresses provided for domain %s", domain) return false, nil } newWorkload := cloneWorkload(workload) + validAddrs := 0 + for _, addr := range addrs { if ip := net.ParseIP(addr); ip != nil { - // Support both IPv4 and IPv6 addresses newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) + validAddrs++ } else { - log.Warnf("invalid IP address: %s for domain %s", addr, domain) + log.Warnf("invalid IP address %s for domain %s", addr, domain) } } if len(newWorkload.Addresses) == 0 { - log.Warnf("no valid addresses after parsing for domain %s", domain) + log.Warnf("no valid addresses for domain %s", domain) return false, nil } @@ -231,7 +244,6 @@ func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { } if _, err := netip.ParseAddr(hostname); err == nil { - // This is an ip address return domains } @@ -248,15 +260,34 @@ func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { return domains } -func (r *dnsController) newWorkloadCache() { +// removeWorkloadFromDnsCache removes a specific workload from DNS cache. +// Called when a workload is deleted from the system. +func (r *dnsController) removeWorkloadFromDnsCache(workloadName string) { r.Lock() defer r.Unlock() - if r.workloadCache != nil { - log.Debug("clean up dns workloads") - r.workloadCache = map[string]*pendingResolveDomain{} + hostname, exists := r.pendingHostnames[workloadName] + if !exists { return } + + delete(r.pendingHostnames, workloadName) + + if pendingDomain, ok := r.workloadCache[hostname]; ok { + updatedWorkloads := make([]*workloadapi.Workload, 0) + for _, wl := range pendingDomain.Workload { + if wl.GetName() != workloadName { + updatedWorkloads = append(updatedWorkloads, wl) + } + } + + if len(updatedWorkloads) == 0 { + delete(r.workloadCache, hostname) + r.dnsResolver.RemoveUnwatchDomain(map[string]any{hostname: nil}) + } else { + pendingDomain.Workload = updatedWorkloads + } + } } func (r *dnsController) getWorkloadsByDomain(domain string) *pendingResolveDomain { diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 4ed6e9bd1..0693c4d83 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -57,6 +57,12 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM } processor.DnsResolverChan = dnsResolverController.workloadsChan processor.ResolvedDomainChanMap = dnsResolverController.ResolvedDomainChanMap + + // Set up callback to clean DNS cache when workload is deleted + processor.onWorkloadDeleted = func(workloadName string) { + dnsResolverController.removeWorkloadFromDnsCache(workloadName) + } + c := &Controller{ Processor: processor, bpfWorkloadObj: bpfWorkload, diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index cf89814e9..06ab4ddc1 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -76,6 +76,8 @@ type Processor struct { DnsResolverChan chan *workloadapi.Workload ResolvedDomainChanMap map[string]chan *workloadapi.Workload + // Callback to remove workload from DNS cache when workload is deleted + onWorkloadDeleted func(workloadName string) } func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { @@ -248,6 +250,12 @@ func (p *Processor) removeWorkload(uid string) error { if wl == nil { return nil } + + // Clean up DNS cache for this workload if it was pending DNS resolution + if p.onWorkloadDeleted != nil && wl.GetName() != "" { + p.onWorkloadDeleted(wl.GetName()) + } + p.WorkloadCache.DeleteWorkload(uid) telemetry.DeleteWorkloadMetric(wl) return p.removeWorkloadFromBpfMap(wl) From a504a7cc44cf1165e3d519542aa57ccea3fcd909 Mon Sep 17 00:00:00 2001 From: Tom Date: Wed, 15 Oct 2025 05:44:12 +0000 Subject: [PATCH 13/14] refactor: improve DNS resolution handling and timeout management Signed-off-by: Tom --- pkg/controller/workload/dns.go | 45 ++++++++----------- pkg/controller/workload/workload_processor.go | 22 ++++----- test/e2e/baseline_test.go | 7 --- 3 files changed, 31 insertions(+), 43 deletions(-) diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go index 4d98439f8..2528f5932 100644 --- a/pkg/controller/workload/dns.go +++ b/pkg/controller/workload/dns.go @@ -17,7 +17,6 @@ package workload import ( - "net" "net/netip" "sync" "time" @@ -83,7 +82,6 @@ func (r *dnsController) Run(stopCh <-chan struct{}) { } // processDomains processes workloads requiring DNS resolution. -// Thread-safe: uses locks to protect shared data structures. func (r *dnsController) processDomains(workload *workloadapi.Workload) { if workload == nil { log.Warn("received nil workload in processDomains") @@ -111,15 +109,20 @@ func (r *dnsController) processDomains(workload *workloadapi.Workload) { ) r.Unlock() - r.dnsResolver.RemoveUnwatchDomain(domains) + // Convert to map[string]any for RemoveUnwatchDomain + unwatchDomains := make(map[string]any, len(domains)) + for k := range domains { + unwatchDomains[k] = nil + } + r.dnsResolver.RemoveUnwatchDomain(unwatchDomains) for k, v := range domains { if addresses := r.dnsResolver.GetDNSAddresses(k); addresses != nil { - go r.updateWorkloads(v.(*pendingResolveDomain), k, addresses) + go r.updateWorkloads(v, k, addresses) } else { domainInfo := &dns.DomainInfo{ Domain: k, - RefreshRate: v.(*pendingResolveDomain).RefreshRate, + RefreshRate: v.RefreshRate, } log.Infof("adding domain %s to DNS resolution queue", k) r.dnsResolver.AddDomainInQueue(domainInfo, 0) @@ -155,13 +158,11 @@ func (r *dnsController) refreshWorker(stop <-chan struct{}) { } // updateWorkloads processes DNS resolution results and updates workloads. -// Uses fine-grained locking to avoid deadlocks. Supports both IPv4 and IPv6. func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { - if pendingDomain == nil || addrs == nil { + if pendingDomain == nil || len(addrs) == 0 { return } - // Process workloads without holding the lock var readyWorkloads []*workloadapi.Workload for _, workload := range pendingDomain.Workload { if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { @@ -195,7 +196,6 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } } - // Clean up domain cache if len(readyWorkloads) > 0 { r.Lock() delete(r.workloadCache, domain) @@ -204,7 +204,6 @@ func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, dom } // overwriteDnsWorkload creates a new workload with resolved IP addresses. -// Supports both IPv4 and IPv6 addresses. func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { if workload.GetHostname() != domain { log.Warnf("domain mismatch: workload hostname %s != domain %s", workload.GetHostname(), domain) @@ -216,45 +215,39 @@ func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, dom } newWorkload := cloneWorkload(workload) - validAddrs := 0 for _, addr := range addrs { - if ip := net.ParseIP(addr); ip != nil { - newWorkload.Addresses = append(newWorkload.Addresses, netip.MustParseAddr(addr).AsSlice()) - validAddrs++ + if parsedAddr, err := netip.ParseAddr(addr); err == nil { + newWorkload.Addresses = append(newWorkload.Addresses, parsedAddr.AsSlice()) } else { - log.Warnf("invalid IP address %s for domain %s", addr, domain) + log.Warnf("invalid IP address %s for domain %s: %v", addr, domain, err) } } if len(newWorkload.Addresses) == 0 { - log.Warnf("no valid addresses for domain %s", domain) + log.Warnf("no valid addresses resolved for domain %s", domain) return false, nil } return true, newWorkload } -func getPendingResolveDomain(workload *workloadapi.Workload) map[string]any { - domains := make(map[string]any) +func getPendingResolveDomain(workload *workloadapi.Workload) map[string]*pendingResolveDomain { + domains := make(map[string]*pendingResolveDomain) hostname := workload.GetHostname() if hostname == "" { return domains } + // Skip if hostname is already an IP address if _, err := netip.ParseAddr(hostname); err == nil { return domains } - if v, ok := domains[hostname]; ok { - v.(*pendingResolveDomain).Workload = append(v.(*pendingResolveDomain).Workload, workload) - } else { - domainWithRefreshRate := &pendingResolveDomain{ - Workload: []*workloadapi.Workload{workload}, - RefreshRate: WorkloadDnsRefreshRate, - } - domains[hostname] = domainWithRefreshRate + domains[hostname] = &pendingResolveDomain{ + Workload: []*workloadapi.Workload{workload}, + RefreshRate: WorkloadDnsRefreshRate, } return domains diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 06ab4ddc1..c9bdfcb80 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -48,6 +48,8 @@ import ( const ( KmeshWaypointPort = 15019 // use this fixed port instead of the HboneMtlsPort in kmesh + // dnsResolveTimeout is the timeout used when waiting for DNS resolution for workloads + dnsResolveTimeout = 3 * time.Second ) type Processor struct { @@ -943,31 +945,31 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, for _, workload := range workloads { if workload.GetAddresses() == nil { - log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) if p.DnsResolverChan == nil { + log.Warnf("workload %s/%s has nil addresses but DNS resolver is disabled", workload.Namespace, workload.Name) continue } + uid := workload.GetUid() p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) p.DnsResolverChan <- workload - log.Infof("wait for workload %s/%s/%s address resolving", workload.Namespace, workload.Name, uid) + log.Infof("waiting for DNS resolution: %s/%s/%s", workload.Namespace, workload.Name, uid) + select { - case <-time.After(3 * time.Second): - log.Warnf("address resolving for workload %s/%s/%s timeout, skip handling", workload.Namespace, workload.Name, uid) - // Cleanup to prevent memory leak + case <-time.After(dnsResolveTimeout): + log.Warnf("DNS resolution timeout for workload %s/%s/%s, skip handling", workload.Namespace, workload.Name, uid) if ch, ok := p.ResolvedDomainChanMap[uid]; ok { close(ch) delete(p.ResolvedDomainChanMap, uid) } continue case newWorkload := <-p.ResolvedDomainChanMap[uid]: - if address := newWorkload.GetAddresses(); address == nil { - log.Warnf("workload: %s/%s resolved addresses is nil, skip handling", newWorkload.Namespace, newWorkload.Name) + if newWorkload == nil || newWorkload.GetAddresses() == nil { + log.Warnf("workload %s/%s resolved addresses is nil, skip handling", workload.Namespace, workload.Name) continue - } else { - log.Infof("workload: %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, address) - workload = newWorkload } + log.Infof("workload %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, newWorkload.Addresses) + workload = newWorkload } } diff --git a/test/e2e/baseline_test.go b/test/e2e/baseline_test.go index c507b40ce..a1dbc4a88 100644 --- a/test/e2e/baseline_test.go +++ b/test/e2e/baseline_test.go @@ -240,18 +240,12 @@ spec: // This simulates external service access using DNS resolution. func TestServiceEntryDNSResolution(t *testing.T) { runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { - // Only test HTTP traffic if opt.Scheme != scheme.HTTP { return } - // Need waypoint proxy for L7 processing if !dst.Config().HasServiceAddressedWaypointProxy() { return } - // Skip IPv6 for now as it's not fully supported - if net.ParseIP(dst.Address()).To4() == nil { - return - } const ( serviceEntryName = "external-svc-dns" @@ -259,7 +253,6 @@ func TestServiceEntryDNSResolution(t *testing.T) { ) // Use enrolled-to-kmesh service as the actual backend for ServiceEntry - // to avoid circular reference when dst is the service-with-waypoint itself backendService := apps.EnrolledToKmesh[0].Config().Service + "." + apps.Namespace.Name() + ".svc.cluster.local" servicePort := dst.Config().Ports.MustForName("http").ServicePort From b74f1066be9405efe25c3b4a06526b483ffd2104 Mon Sep 17 00:00:00 2001 From: Tom Date: Wed, 15 Oct 2025 08:49:50 +0000 Subject: [PATCH 14/14] refactor: update client and controller constructors to return errors Signed-off-by: Tom --- pkg/controller/client.go | 10 +++++++--- pkg/controller/client_test.go | 13 ++++++++----- pkg/controller/controller.go | 5 ++++- pkg/controller/workload/workload_controller.go | 7 +++---- pkg/controller/workload/workload_processor_test.go | 3 ++- test/e2e/baseline_test.go | 9 +++++++++ 6 files changed, 33 insertions(+), 14 deletions(-) diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 6f6c9a23a..37ebd6f1e 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -50,7 +50,7 @@ type XdsClient struct { xdsConfig *config.XdsConfig } -func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enableProfiling bool) *XdsClient { +func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enableProfiling bool) (*XdsClient, error) { client := &XdsClient{ mode: mode, xdsConfig: config.GetConfig(mode), @@ -58,14 +58,18 @@ func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWork switch mode { case constants.DualEngineMode: - client.WorkloadController = workload.NewController(bpfWorkload, enableMonitoring, enableProfiling) + var err error + client.WorkloadController, err = workload.NewController(bpfWorkload, enableMonitoring, enableProfiling) + if err != nil { + return nil, fmt.Errorf("failed to create workload controller: %w", err) + } case constants.KernelNativeMode: client.AdsController = ads.NewController(bpfAds) } client.ctx, client.cancel = context.WithCancel(context.Background()) client.ctx = metadata.AppendToOutgoingContext(client.ctx, "ClusterID", client.xdsConfig.Metadata.ClusterID.String()) - return client + return client, nil } func (c *XdsClient) createGrpcStreamClient() error { diff --git a/pkg/controller/client_test.go b/pkg/controller/client_test.go index df300115c..e6a6c1eae 100644 --- a/pkg/controller/client_test.go +++ b/pkg/controller/client_test.go @@ -38,7 +38,8 @@ import ( func TestRecoverConnection(t *testing.T) { t.Run("test reconnect success", func(t *testing.T) { - utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + utClient, err := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) patches := gomonkey.NewPatches() defer patches.Reset() iteration := 0 @@ -79,8 +80,9 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) - err := utClient.createGrpcStreamClient() + utClient, err := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) + err = utClient.createGrpcStreamClient() assert.NoError(t, err) reConnectPatches := gomonkey.NewPatches() @@ -126,8 +128,9 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) - err := utClient.createGrpcStreamClient() + utClient, err := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) + err = utClient.createGrpcStreamClient() assert.NoError(t, err) reConnectPatches := gomonkey.NewPatches() diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4e5bf1b2d..17a0d53e8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -159,7 +159,10 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { } } - c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) + c.client, err = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) + if err != nil { + return fmt.Errorf("failed to create XDS client: %w", err) + } if c.client.WorkloadController != nil { if err := c.client.WorkloadController.Run(ctx, stopCh); err != nil { diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 0693c4d83..f83600ca3 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -48,12 +48,11 @@ type Controller struct { dnsResolverController *dnsController } -func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) *Controller { +func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) (*Controller, error) { processor := NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps) dnsResolverController, err := NewDnsController(processor.WorkloadCache) if err != nil { - log.Errorf("dns resolver of Dual-Engine mode create failed: %v", err) - return nil + return nil, fmt.Errorf("failed to create DNS resolver for Dual-Engine mode: %w", err) } processor.DnsResolverChan = dnsResolverController.workloadsChan processor.ResolvedDomainChanMap = dnsResolverController.ResolvedDomainChanMap @@ -79,7 +78,7 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM c.OperationMetricController = telemetry.NewBpfProgMetric() c.MapMetricController = telemetry.NewMapMetric() } - return c + return c, nil } func (c *Controller) Run(ctx context.Context, stopCh <-chan struct{}) error { diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 4aac25b5a..cdabb7121 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -359,7 +359,8 @@ func BenchmarkAddNewServicesWithWorkload(b *testing.B) { cleanup, bpfLoader := test.InitBpfMap(t, config) b.Cleanup(cleanup) - workloadController := NewController(bpfLoader.GetBpfWorkload(), false, false) + workloadController, err := NewController(bpfLoader.GetBpfWorkload(), false, false) + assert.NoError(t, err) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/test/e2e/baseline_test.go b/test/e2e/baseline_test.go index a1dbc4a88..ad5580b14 100644 --- a/test/e2e/baseline_test.go +++ b/test/e2e/baseline_test.go @@ -247,6 +247,15 @@ func TestServiceEntryDNSResolution(t *testing.T) { return } + // Skip this test in IPv6-only environment as DNS proxy is disabled for IPv6 + // (see test/e2e/run_test.sh: "skip dns proxy for ipv6") + // Without DNS proxy, fake hostname resolution won't work + v4, v6 := getSupportedIPFamilies(t) + if !v4 && v6 { + t.Skip("Skipping DNS resolution test in IPv6-only environment (DNS proxy disabled)") + return + } + const ( serviceEntryName = "external-svc-dns" fakeHostname = "foo.bar.com"