diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 000000000..700534ad4 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,33 @@ +#!/bin/bash +# +# Kmesh pre-commit hook +# This hook runs 'make clean' and 'make gen-check' before each commit +# to ensure generated files are up-to-date and temporary files are cleaned up. +# + +set -e + +echo "Running pre-commit checks..." + +# Change to repository root +REPO_ROOT=$(git rev-parse --show-toplevel) +cd "$REPO_ROOT" + +# Run make clean to restore auto-generated files +echo "→ Running 'make clean'..." +if ! make clean; then + echo "❌ 'make clean' failed" + exit 1 +fi + +# Run make gen-check to verify generated files are up-to-date +echo "→ Running 'make gen-check'..." +if ! make gen-check; then + echo "❌ 'make gen-check' failed" + echo "" + echo "Generated files are out of date. Please run 'make gen' and commit the changes." + exit 1 +fi + +echo "✅ Pre-commit checks passed" +exit 0 diff --git a/.gitignore b/.gitignore index 55e6e051d..15bb9f4d6 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,4 @@ oncn-mda/build/ oncn-mda/deploy/ test/mugen-master/ +.github/copilot-instructions.md diff --git a/README-zh.md b/README-zh.md index a628d16a9..24fe6660c 100644 --- a/README-zh.md +++ b/README-zh.md @@ -115,8 +115,8 @@ Kmesh创新性的提出将流量治理下沉OS,在数据路径上无需经过 time="2024-02-19T10:16:53Z" level=info msg="bpf Start successful" subsys=manager time="2024-02-19T10:16:53Z" level=info msg="controller Start successful" subsys=manager time="2024-02-19T10:16:53Z" level=info msg="command StartServer successful" subsys=manager - time="2024-02-19T10:16:53Z" level=info msg="start write CNI config\n" subsys="cni installer" - time="2024-02-19T10:16:53Z" level=info msg="kmesh cni use chained\n" subsys="cni installer" + time="2024-02-19T10:16:53Z" level=info msg="start write CNI config" subsys="cni installer" + time="2024-02-19T10:16:53Z" level=info msg="kmesh cni use chained" subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="Copied /usr/bin/kmesh-cni to /opt/cni/bin." subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="kubeconfig either does not exist or is out of date, writing a new one" subsys="cni installer" time="2024-02-19T10:16:54Z" level=info msg="wrote kubeconfig file /etc/cni/net.d/kmesh-cni-kubeconfig" subsys="cni installer" diff --git a/pkg/cni/install.go b/pkg/cni/install.go index 0054c1e5f..c57a69b5c 100644 --- a/pkg/cni/install.go +++ b/pkg/cni/install.go @@ -35,13 +35,13 @@ func (i *Installer) addCniConfig() error { if i.CniConfigChained { // "chained" is an cni type // information: www.cni.dev/docs/spec/#overview-1 - log.Infof("kmesh cni use chained\n") + log.Infof("kmesh cni use chained") err = i.chainedKmeshCniPlugin(i.Mode, i.CniMountNetEtcDIR) if err != nil { return err } } else { - log.Error("currently kmesh only support chained cni mode\n") + log.Error("currently kmesh only support chained cni mode") } return nil } diff --git a/pkg/controller/client.go b/pkg/controller/client.go index 8ea1b33a4..37ebd6f1e 100644 --- a/pkg/controller/client.go +++ b/pkg/controller/client.go @@ -50,21 +50,26 @@ type XdsClient struct { xdsConfig *config.XdsConfig } -func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enableProfiling bool) *XdsClient { +func NewXdsClient(mode string, bpfAds *bpfads.BpfAds, bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enableProfiling bool) (*XdsClient, error) { client := &XdsClient{ mode: mode, xdsConfig: config.GetConfig(mode), } - if mode == constants.DualEngineMode { - client.WorkloadController = workload.NewController(bpfWorkload, enableMonitoring, enableProfiling) - } else if mode == constants.KernelNativeMode { + switch mode { + case constants.DualEngineMode: + var err error + client.WorkloadController, err = workload.NewController(bpfWorkload, enableMonitoring, enableProfiling) + if err != nil { + return nil, fmt.Errorf("failed to create workload controller: %w", err) + } + case constants.KernelNativeMode: client.AdsController = ads.NewController(bpfAds) } client.ctx, client.cancel = context.WithCancel(context.Background()) client.ctx = metadata.AppendToOutgoingContext(client.ctx, "ClusterID", client.xdsConfig.Metadata.ClusterID.String()) - return client + return client, nil } func (c *XdsClient) createGrpcStreamClient() error { diff --git a/pkg/controller/client_test.go b/pkg/controller/client_test.go index df300115c..e6a6c1eae 100644 --- a/pkg/controller/client_test.go +++ b/pkg/controller/client_test.go @@ -38,7 +38,8 @@ import ( func TestRecoverConnection(t *testing.T) { t.Run("test reconnect success", func(t *testing.T) { - utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + utClient, err := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) patches := gomonkey.NewPatches() defer patches.Reset() iteration := 0 @@ -79,8 +80,9 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) - err := utClient.createGrpcStreamClient() + utClient, err := NewXdsClient(constants.KernelNativeMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) + err = utClient.createGrpcStreamClient() assert.NoError(t, err) reConnectPatches := gomonkey.NewPatches() @@ -126,8 +128,9 @@ func TestClientResponseProcess(t *testing.T) { })) }) - utClient := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) - err := utClient.createGrpcStreamClient() + utClient, err := NewXdsClient(constants.DualEngineMode, &bpfads.BpfAds{}, &bpfwl.BpfWorkload{}, false, false) + assert.NoError(t, err) + err = utClient.createGrpcStreamClient() assert.NoError(t, err) reConnectPatches := gomonkey.NewPatches() diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9e2b90862..17a0d53e8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -159,10 +159,13 @@ func (c *Controller) Start(stopCh <-chan struct{}) error { } } - c.client = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) + c.client, err = NewXdsClient(c.mode, c.bpfAdsObj, c.bpfWorkloadObj, c.bpfConfig.EnableMonitoring, c.bpfConfig.EnableProfiling) + if err != nil { + return fmt.Errorf("failed to create XDS client: %w", err) + } if c.client.WorkloadController != nil { - if err := c.client.WorkloadController.Run(ctx); err != nil { + if err := c.client.WorkloadController.Run(ctx, stopCh); err != nil { return fmt.Errorf("failed to start workload controller: %+v", err) } } else { diff --git a/pkg/controller/workload/dns.go b/pkg/controller/workload/dns.go new file mode 100644 index 000000000..2528f5932 --- /dev/null +++ b/pkg/controller/workload/dns.go @@ -0,0 +1,304 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workload + +import ( + "net/netip" + "sync" + "time" + + "google.golang.org/protobuf/proto" + + "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller/workload/cache" + "kmesh.net/kmesh/pkg/dns" +) + +const ( + WorkloadDnsRefreshRate = 200 * time.Millisecond // DNS refresh rate for workloads + WorkloadChannelSendTimeout = 100 * time.Millisecond // Timeout for sending to workload channel +) + +type dnsController struct { + cache cache.WorkloadCache + dnsResolver *dns.DNSResolver + + workloadsChan chan *workloadapi.Workload + ResolvedDomainChanMap map[string]chan *workloadapi.Workload + + workloadCache map[string]*pendingResolveDomain // hostname -> pending workloads + pendingHostnames map[string]string // workload name -> hostname + sync.RWMutex +} + +// pendingResolveDomain stores workloads pending DNS resolution for a domain +type pendingResolveDomain struct { + Workload []*workloadapi.Workload + RefreshRate time.Duration +} + +func NewDnsController(cache cache.WorkloadCache) (*dnsController, error) { + resolver, err := dns.NewDNSResolver() + if err != nil { + return nil, err + } + dnsController := &dnsController{ + cache: cache, + dnsResolver: resolver, + workloadsChan: make(chan *workloadapi.Workload), + ResolvedDomainChanMap: make(map[string]chan *workloadapi.Workload), + workloadCache: make(map[string]*pendingResolveDomain), + pendingHostnames: make(map[string]string), + } + return dnsController, nil +} + +func (r *dnsController) Run(stopCh <-chan struct{}) { + go r.dnsResolver.StartDnsResolver(stopCh) + go r.refreshWorker(stopCh) + go func() { + for workload := range r.workloadsChan { + r.processDomains(workload) + } + }() + go func() { + <-stopCh + close(r.workloadsChan) + }() +} + +// processDomains processes workloads requiring DNS resolution. +func (r *dnsController) processDomains(workload *workloadapi.Workload) { + if workload == nil { + log.Warn("received nil workload in processDomains") + return + } + + domains := getPendingResolveDomain(workload) + if len(domains) == 0 { + return + } + + workloadName := workload.GetName() + hostname := workload.GetHostname() + + r.Lock() + r.pendingHostnames[workloadName] = hostname + if _, ok := r.workloadCache[hostname]; !ok { + r.workloadCache[hostname] = &pendingResolveDomain{ + Workload: make([]*workloadapi.Workload, 0), + RefreshRate: WorkloadDnsRefreshRate, + } + } + r.workloadCache[hostname].Workload = append( + r.workloadCache[hostname].Workload, workload, + ) + r.Unlock() + + // Convert to map[string]any for RemoveUnwatchDomain + unwatchDomains := make(map[string]any, len(domains)) + for k := range domains { + unwatchDomains[k] = nil + } + r.dnsResolver.RemoveUnwatchDomain(unwatchDomains) + + for k, v := range domains { + if addresses := r.dnsResolver.GetDNSAddresses(k); addresses != nil { + go r.updateWorkloads(v, k, addresses) + } else { + domainInfo := &dns.DomainInfo{ + Domain: k, + RefreshRate: v.RefreshRate, + } + log.Infof("adding domain %s to DNS resolution queue", k) + r.dnsResolver.AddDomainInQueue(domainInfo, 0) + } + } +} + +func (r *dnsController) refreshWorker(stop <-chan struct{}) { + for { + select { + case <-stop: + log.Info("DNS refresh worker stopped") + return + case domain := <-r.dnsResolver.DnsChan: + if domain == "" { + continue + } + + pendingDomain := r.getWorkloadsByDomain(domain) + if pendingDomain == nil { + continue + } + + addrs := r.dnsResolver.GetDNSAddresses(domain) + if len(addrs) == 0 { + log.Warnf("no DNS addresses found for domain %s", domain) + continue + } + + r.updateWorkloads(pendingDomain, domain, addrs) + } + } +} + +// updateWorkloads processes DNS resolution results and updates workloads. +func (r *dnsController) updateWorkloads(pendingDomain *pendingResolveDomain, domain string, addrs []string) { + if pendingDomain == nil || len(addrs) == 0 { + return + } + + var readyWorkloads []*workloadapi.Workload + for _, workload := range pendingDomain.Workload { + if ready, newWorkload := r.overwriteDnsWorkload(workload, domain, addrs); ready { + readyWorkloads = append(readyWorkloads, newWorkload) + } + } + + // Send to channels without holding lock to prevent deadlock + for _, newWorkload := range readyWorkloads { + uid := newWorkload.GetUid() + + r.Lock() + ch, ok := r.ResolvedDomainChanMap[uid] + r.Unlock() + + if ok { + r.cache.AddOrUpdateWorkload(newWorkload) + select { + case ch <- newWorkload: + log.Infof("workload %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, newWorkload.Addresses) + case <-time.After(WorkloadChannelSendTimeout): + log.Warnf("timeout sending resolved workload %s/%s", newWorkload.Namespace, newWorkload.Name) + } + + r.Lock() + if _, stillExists := r.ResolvedDomainChanMap[uid]; stillExists { + close(r.ResolvedDomainChanMap[uid]) + delete(r.ResolvedDomainChanMap, uid) + } + r.Unlock() + } + } + + if len(readyWorkloads) > 0 { + r.Lock() + delete(r.workloadCache, domain) + r.Unlock() + } +} + +// overwriteDnsWorkload creates a new workload with resolved IP addresses. +func (r *dnsController) overwriteDnsWorkload(workload *workloadapi.Workload, domain string, addrs []string) (bool, *workloadapi.Workload) { + if workload.GetHostname() != domain { + log.Warnf("domain mismatch: workload hostname %s != domain %s", workload.GetHostname(), domain) + return false, nil + } + + if len(addrs) == 0 { + return false, nil + } + + newWorkload := cloneWorkload(workload) + + for _, addr := range addrs { + if parsedAddr, err := netip.ParseAddr(addr); err == nil { + newWorkload.Addresses = append(newWorkload.Addresses, parsedAddr.AsSlice()) + } else { + log.Warnf("invalid IP address %s for domain %s: %v", addr, domain, err) + } + } + + if len(newWorkload.Addresses) == 0 { + log.Warnf("no valid addresses resolved for domain %s", domain) + return false, nil + } + + return true, newWorkload +} + +func getPendingResolveDomain(workload *workloadapi.Workload) map[string]*pendingResolveDomain { + domains := make(map[string]*pendingResolveDomain) + + hostname := workload.GetHostname() + if hostname == "" { + return domains + } + + // Skip if hostname is already an IP address + if _, err := netip.ParseAddr(hostname); err == nil { + return domains + } + + domains[hostname] = &pendingResolveDomain{ + Workload: []*workloadapi.Workload{workload}, + RefreshRate: WorkloadDnsRefreshRate, + } + + return domains +} + +// removeWorkloadFromDnsCache removes a specific workload from DNS cache. +// Called when a workload is deleted from the system. +func (r *dnsController) removeWorkloadFromDnsCache(workloadName string) { + r.Lock() + defer r.Unlock() + + hostname, exists := r.pendingHostnames[workloadName] + if !exists { + return + } + + delete(r.pendingHostnames, workloadName) + + if pendingDomain, ok := r.workloadCache[hostname]; ok { + updatedWorkloads := make([]*workloadapi.Workload, 0) + for _, wl := range pendingDomain.Workload { + if wl.GetName() != workloadName { + updatedWorkloads = append(updatedWorkloads, wl) + } + } + + if len(updatedWorkloads) == 0 { + delete(r.workloadCache, hostname) + r.dnsResolver.RemoveUnwatchDomain(map[string]any{hostname: nil}) + } else { + pendingDomain.Workload = updatedWorkloads + } + } +} + +func (r *dnsController) getWorkloadsByDomain(domain string) *pendingResolveDomain { + r.RLock() + defer r.RUnlock() + + if r.workloadCache != nil { + if v, ok := r.workloadCache[domain]; ok { + return v + } + } + return nil +} + +func cloneWorkload(workload *workloadapi.Workload) *workloadapi.Workload { + if workload == nil { + return nil + } + workloadCopy := proto.Clone(workload).(*workloadapi.Workload) + return workloadCopy +} diff --git a/pkg/controller/workload/dns_test.go b/pkg/controller/workload/dns_test.go new file mode 100644 index 000000000..7ef5fef48 --- /dev/null +++ b/pkg/controller/workload/dns_test.go @@ -0,0 +1,341 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workload + +import ( + "net/netip" + "reflect" + "testing" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/stretchr/testify/assert" + "istio.io/istio/pkg/slices" + "istio.io/istio/pkg/test/util/retry" + + "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller/workload/bpfcache" + "kmesh.net/kmesh/pkg/dns" +) + +func TestOverwriteDnsWorkloadWithIPv4(t *testing.T) { + domain := "example.com" + // Test with only IPv4 addresses + addrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} + workload := &workloadapi.Workload{ + Uid: "test-uid-ipv4", + Name: "test-workload-ipv4", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + p.DnsResolverChan = dnsController.workloadsChan + + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.True(t, ready, "should successfully process IPv4 addresses") + + // Verify all IPv4 addresses are correctly added + expectedAddrs := []string{"192.168.1.1", "192.168.1.2", "10.0.0.1"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) + } + assert.Equal(t, expectedAddrs, actualAddrs) +} + +func TestOverwriteDnsWorkloadWithIPv6(t *testing.T) { + domain := "example.com" + // Test with only IPv6 addresses + addrs := []string{"2001:db8::1", "fe80::1", "2001:db8::2"} + workload := &workloadapi.Workload{ + Uid: "test-uid-ipv6", + Name: "test-workload-ipv6", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.True(t, ready, "should successfully process IPv6 addresses") + + // Verify all IPv6 addresses are correctly added + expectedAddrs := []string{"2001:db8::1", "fe80::1", "2001:db8::2"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) + } + assert.Equal(t, expectedAddrs, actualAddrs) +} + +func TestOverwriteDnsWorkloadWithMixedAddresses(t *testing.T) { + domain := "example.com" + // Test with both IPv4 and IPv6 addresses mixed + addrs := []string{"192.168.1.1", "2001:db8::1", "10.0.0.1", "fe80::1"} + workload := &workloadapi.Workload{ + Uid: "test-uid-mixed", + Name: "test-workload-mixed", + Hostname: domain, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + + ready, newWorkload := dnsController.overwriteDnsWorkload(workload, domain, addrs) + assert.True(t, ready, "should successfully process mixed IPv4 and IPv6 addresses") + + // Verify both IPv4 and IPv6 addresses are correctly added + expectedAddrs := []string{"192.168.1.1", "2001:db8::1", "10.0.0.1", "fe80::1"} + actualAddrs := make([]string, 0, len(newWorkload.Addresses)) + for _, addr := range newWorkload.Addresses { + ip, _ := netip.AddrFromSlice(addr) + actualAddrs = append(actualAddrs, ip.String()) + } + assert.Equal(t, expectedAddrs, actualAddrs) +} + +func TestHandleWorkloadsWithDns(t *testing.T) { + workload1 := &workloadapi.Workload{ + Uid: "test-uid-1", + Name: "test-workload-1", + Hostname: "foo.bar", + } + workload2 := &workloadapi.Workload{ + Uid: "test-uid-2", + Name: "test-workload-2", + Hostname: "foo.baz", + Addresses: [][]byte{netip.MustParseAddr("192.168.1.1").AsSlice()}, + } + + testcases := []struct { + name string + workloads []*workloadapi.Workload + expected []string + }{ + { + name: "add workloads with DNS hostname", + workloads: []*workloadapi.Workload{workload1, workload2}, + expected: []string{"foo.bar"}, + }, + } + + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + stopCh := make(chan struct{}) + defer close(stopCh) + + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + dnsController.Run(stopCh) + p.DnsResolverChan = dnsController.workloadsChan + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Send workloads that need DNS resolution + for _, wl := range tc.workloads { + if wl.GetAddresses() == nil { + dnsController.workloadsChan <- wl + } + } + + // Verify pending domains are correct + retry.UntilOrFail(t, func() bool { + domains := make([]string, 0) + for _, wl := range tc.workloads { + if wl.GetAddresses() == nil { + result := getPendingResolveDomain(wl) + for domain := range result { + domains = append(domains, domain) + } + } + } + return slices.EqualUnordered(tc.expected, domains) + }, retry.Timeout(1*time.Second)) + }) + } +} + +func TestGetPendingResolveDomain(t *testing.T) { + tests := []struct { + name string + workload *workloadapi.Workload + expected []string + }{ + { + name: "valid hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-1", + Name: "test-workload", + Hostname: "example.com", + }, + expected: []string{"example.com"}, + }, + { + name: "empty hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-2", + Name: "test-workload-2", + Hostname: "", + }, + expected: []string{}, + }, + { + name: "ip address as hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-3", + Name: "test-workload-3", + Hostname: "192.168.1.1", + }, + expected: []string{}, + }, + { + name: "ipv6 address as hostname", + workload: &workloadapi.Workload{ + Uid: "test-uid-4", + Name: "test-workload-4", + Hostname: "2001:db8::1", + }, + expected: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getPendingResolveDomain(tt.workload) + domains := make([]string, 0, len(result)) + for domain := range result { + domains = append(domains, domain) + } + assert.True(t, slices.EqualUnordered(tt.expected, domains)) + }) + } +} + +func TestDnsController_ProcessDomains(t *testing.T) { + workloadMap := bpfcache.NewFakeWorkloadMap(t) + defer bpfcache.CleanupFakeWorkloadMap(workloadMap) + + p := NewProcessor(workloadMap) + dnsController, err := NewDnsController(p.WorkloadCache) + assert.NoError(t, err) + + workload := &workloadapi.Workload{ + Uid: "test-uid", + Name: "test-workload", + Hostname: "example.com", + } + + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "GetDNSAddresses", + func(_ *dns.DNSResolver, domain string) []string { + return nil // Mock initial cache miss scenario + }) + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "RemoveUnwatchDomain", + func(_ *dns.DNSResolver, domains map[string]any) { + // Mock domain unwatch removal + }) + patches.ApplyMethod(reflect.TypeOf(dnsController.dnsResolver), "AddDomainInQueue", + func(_ *dns.DNSResolver, domainInfo *dns.DomainInfo, delay time.Duration) { + // Mock adding domain to queue + }) + + // Test domain processing + dnsController.processDomains(workload) + + // Verify workloadCache is updated + assert.Contains(t, dnsController.workloadCache, "example.com") + assert.Contains(t, dnsController.pendingHostnames, "test-workload") + assert.Equal(t, "example.com", dnsController.pendingHostnames["test-workload"]) + + pendingDomain := dnsController.workloadCache["example.com"] + assert.NotNil(t, pendingDomain) + assert.Equal(t, 1, len(pendingDomain.Workload)) + assert.Equal(t, workload, pendingDomain.Workload[0]) + assert.Equal(t, WorkloadDnsRefreshRate, pendingDomain.RefreshRate) +} + +func TestCloneWorkload(t *testing.T) { + tests := []struct { + name string + workload *workloadapi.Workload + wantNil bool + }{ + { + name: "nil workload", + workload: nil, + wantNil: true, + }, + { + name: "valid workload", + workload: &workloadapi.Workload{ + Uid: "test-uid", + Name: "test-workload", + Hostname: "example.com", + Addresses: [][]byte{ + netip.MustParseAddr("192.168.1.1").AsSlice(), + }, + }, + wantNil: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := cloneWorkload(tt.workload) + + if tt.wantNil { + assert.Nil(t, result) + } else { + assert.NotNil(t, result) + assert.Equal(t, tt.workload.Uid, result.Uid) + assert.Equal(t, tt.workload.Name, result.Name) + assert.Equal(t, tt.workload.Hostname, result.Hostname) + assert.Equal(t, len(tt.workload.Addresses), len(result.Addresses)) + + // Verify different object instances + assert.NotSame(t, tt.workload, result) + } + }) + } +} diff --git a/pkg/controller/workload/workload_controller.go b/pkg/controller/workload/workload_controller.go index 3facbe89b..f83600ca3 100644 --- a/pkg/controller/workload/workload_controller.go +++ b/pkg/controller/workload/workload_controller.go @@ -45,12 +45,27 @@ type Controller struct { MapMetricController *telemetry.MapMetricController OperationMetricController *telemetry.BpfProgMetric bpfWorkloadObj *bpfwl.BpfWorkload + dnsResolverController *dnsController } -func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) *Controller { +func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfMonitor bool) (*Controller, error) { + processor := NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps) + dnsResolverController, err := NewDnsController(processor.WorkloadCache) + if err != nil { + return nil, fmt.Errorf("failed to create DNS resolver for Dual-Engine mode: %w", err) + } + processor.DnsResolverChan = dnsResolverController.workloadsChan + processor.ResolvedDomainChanMap = dnsResolverController.ResolvedDomainChanMap + + // Set up callback to clean DNS cache when workload is deleted + processor.onWorkloadDeleted = func(workloadName string) { + dnsResolverController.removeWorkloadFromDnsCache(workloadName) + } + c := &Controller{ - Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps), - bpfWorkloadObj: bpfWorkload, + Processor: processor, + bpfWorkloadObj: bpfWorkload, + dnsResolverController: dnsResolverController, } // do some initialization when restart // restore endpoint index, otherwise endpoint number can double @@ -63,10 +78,10 @@ func NewController(bpfWorkload *bpfwl.BpfWorkload, enableMonitoring, enablePerfM c.OperationMetricController = telemetry.NewBpfProgMetric() c.MapMetricController = telemetry.NewMapMetric() } - return c + return c, nil } -func (c *Controller) Run(ctx context.Context) error { +func (c *Controller) Run(ctx context.Context, stopCh <-chan struct{}) error { if err := c.Processor.PrepareDNSProxy(); err != nil { log.Errorf("failed to prepare for dns proxy, err: %+v", err) return err @@ -93,6 +108,9 @@ func (c *Controller) Run(ctx context.Context) error { if c.OperationMetricController != nil { go c.OperationMetricController.Run(ctx, c.bpfWorkloadObj.SockConn.KmPerfInfo) } + if c.dnsResolverController != nil { + go c.dnsResolverController.Run(stopCh) + } return nil } diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index e181f7d65..c9bdfcb80 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -23,6 +23,7 @@ import ( "sort" "strings" "sync" + "time" service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/protobuf/proto" @@ -31,7 +32,6 @@ import ( "istio.io/istio/pkg/util/sets" "kmesh.net/kmesh/api/v2/workloadapi" - "kmesh.net/kmesh/api/v2/workloadapi/security" security_v2 "kmesh.net/kmesh/api/v2/workloadapi/security" bpf2go "kmesh.net/kmesh/bpf/kmesh/bpf2go/dualengine" "kmesh.net/kmesh/pkg/auth" @@ -40,7 +40,6 @@ import ( "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/controller/config" "kmesh.net/kmesh/pkg/controller/telemetry" - "kmesh.net/kmesh/pkg/controller/workload/bpfcache" bpf "kmesh.net/kmesh/pkg/controller/workload/bpfcache" "kmesh.net/kmesh/pkg/controller/workload/cache" "kmesh.net/kmesh/pkg/nets" @@ -49,6 +48,8 @@ import ( const ( KmeshWaypointPort = 15019 // use this fixed port instead of the HboneMtlsPort in kmesh + // dnsResolveTimeout is the timeout used when waiting for DNS resolution for workloads + dnsResolveTimeout = 3 * time.Second ) type Processor struct { @@ -74,6 +75,11 @@ type Processor struct { authzRespOnce sync.Once handlers map[string][]func(resp *service_discovery_v3.DeltaDiscoveryResponse) error + + DnsResolverChan chan *workloadapi.Workload + ResolvedDomainChanMap map[string]chan *workloadapi.Workload + // Callback to remove workload from DNS cache when workload is deleted + onWorkloadDeleted func(workloadName string) } func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { @@ -223,7 +229,7 @@ func (p *Processor) storePodFrontendData(uid uint32, ip []byte) error { nets.CopyIpByteFromSlice(&fk.Ip, ip) fv.UpstreamId = uid if err := p.bpf.FrontendUpdate(&fk, &fv); err != nil { - return fmt.Errorf("Update frontend map failed, err:%s", err) + return fmt.Errorf("update frontend map failed, err:%s", err) } return nil @@ -246,6 +252,12 @@ func (p *Processor) removeWorkload(uid string) error { if wl == nil { return nil } + + // Clean up DNS cache for this workload if it was pending DNS resolution + if p.onWorkloadDeleted != nil && wl.GetName() != "" { + p.onWorkloadDeleted(wl.GetName()) + } + p.WorkloadCache.DeleteWorkload(uid) telemetry.DeleteWorkloadMetric(wl) return p.removeWorkloadFromBpfMap(wl) @@ -388,7 +400,7 @@ func (p *Processor) removeServiceResourceFromBpfMap(svc *workloadapi.Service, na } // addWorkloadToService update service & endpoint bpf map when a workload has new bound services -func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, workloadUid uint32, priority uint32) (error, bpf.EndpointKey) { +func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValue, workloadUid uint32, priority uint32) (bpf.EndpointKey, error) { var ( ek = bpf.EndpointKey{} ev = bpf.EndpointValue{} @@ -401,14 +413,14 @@ func (p *Processor) addWorkloadToService(sk *bpf.ServiceKey, sv *bpf.ServiceValu ev.BackendUid = workloadUid if err := p.bpf.EndpointUpdate(&ek, &ev); err != nil { log.Errorf("Update endpoint map failed, err:%s", err) - return err, ek + return ek, err } p.EndpointCache.AddEndpointToService(cache.Endpoint{ServiceId: ek.ServiceId, Prio: ek.Prio, BackendIndex: ek.BackendIndex}, ev.BackendUid) if err := p.bpf.ServiceUpdate(sk, sv); err != nil { log.Errorf("Update ServiceUpdate map failed, err:%s", err) - return err, ek + return ek, err } - return nil, ek + return ek, nil } // handleWorkloadUnboundServices handles when a workload's belonging services removed @@ -440,7 +452,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa if err := p.bpf.ServiceLookup(&sk, &sv); err == nil { if sv.LbPolicy == uint32(workloadapi.LoadBalancing_UNSPECIFIED_MODE) { // random mode // In random mode, we save all workload to max priority group - if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { + if _, err = p.addWorkloadToService(&sk, &sv, workloadId, 0); err != nil { log.Errorf("addWorkloadToService workload %d service %d failed: %v", workloadId, sk.ServiceId, err) return err } @@ -448,7 +460,7 @@ func (p *Processor) handleWorkloadNewBoundServices(workload *workloadapi.Workloa service := p.ServiceCache.GetService(p.hashName.NumToStr(svcUid)) if p.locality.LocalityInfo != nil && service != nil { prio := p.locality.CalcLocalityLBPrio(workload, service.LoadBalancing.GetRoutingPreference()) - if err, _ = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil { + if _, err = p.addWorkloadToService(&sk, &sv, workloadId, prio); err != nil { log.Errorf("addWorkloadToService workload %d service %d priority %d failed: %v", workloadId, sk.ServiceId, prio, err) return err } @@ -590,7 +602,7 @@ func (p *Processor) handleWorkload(workload *workloadapi.Workload) error { // Because there is only one address in the workload, a direct comparison can be made to // determine whether the old data needs to be deleted or not. - if !slices.Equal(newWorkloadAddresses[0], oldWorkloadAddresses[0]) { + if len(newWorkloadAddresses) > 0 && len(oldWorkloadAddresses) > 0 && !slices.Equal(newWorkloadAddresses[0], oldWorkloadAddresses[0]) { err := p.deleteFrontendByIp(oldWorkloadAddresses) if err != nil { return fmt.Errorf("frontend map delete failed: %v", err) @@ -687,10 +699,10 @@ func (p *Processor) updateEndpointOneByOne(serviceId uint32, epsUpdate []cache.E } // add ek first to another priority group - if err, _ := p.addWorkloadToService(&sKey, &sValue, ev.BackendUid, prio); err != nil { + if _, err := p.addWorkloadToService(&sKey, &sValue, ev.BackendUid, prio); err != nil { return fmt.Errorf("update endpoint %d priority to %d failed: %v", ev.BackendUid, prio, err) } - epKeys := []bpfcache.EndpointKey{ek} + epKeys := []bpf.EndpointKey{ek} // delete ek from old priority group if err := p.deleteEndpointRecords(epKeys); err != nil { return fmt.Errorf("delete endpoint %d from old priority group %d failed: %v", ev.BackendUid, ek.Prio, err) @@ -747,7 +759,7 @@ func (p *Processor) updateServiceMap(service, oldService *workloadapi.Service) e if strings.Contains(serviceName, "waypoint") { newServiceInfo.TargetPort[i] = nets.ConvertPortToBigEndian(KmeshWaypointPort) } else if port.TargetPort == 0 { - // NOTE: Target port could be unset in servicen entry, in which case it should + // NOTE: Target port could be unset in service entry, in which case it should // be consistent with the Service Port. newServiceInfo.TargetPort[i] = nets.ConvertPortToBigEndian(port.ServicePort) } else { @@ -835,7 +847,7 @@ func (p *Processor) handleService(service *workloadapi.Service) error { // Preprocess service, remove the waypoint from waypoint service, otherwise it will fall into a loop in bpf if service.Waypoint != nil && service.GetWaypoint().GetAddress() != nil && len(service.Addresses) != 0 { // Currently istiod only set the waypoint address to the first address of the service - // When waypoints of different granularities are deployed together, the only waypoint service to be determined + // when waypoints of different granularities are deployed together, the only waypoint service to be determined // is whether it contains port 15021, ref: https://github.com/kmesh-net/kmesh/issues/691 // TODO: remove when upstream istiod will not set the waypoint address for itself if slices.Equal(service.GetWaypoint().GetAddress().Address, service.Addresses[0].Address) || containsPort(15021) { @@ -888,12 +900,12 @@ func (p *Processor) handleAddressTypeResponse(rsp *service_discovery_v3.DeltaDis // sort resources, first process services, then workload var services []*workloadapi.Service var workloads []*workloadapi.Workload + for _, resource := range rsp.GetResources() { address := &workloadapi.Address{} if err = anypb.UnmarshalTo(resource.Resource, address, proto.UnmarshalOptions{}); err != nil { continue } - switch address.GetType().(type) { case *workloadapi.Address_Workload: workloads = append(workloads, address.GetWorkload()) @@ -932,10 +944,33 @@ func (p *Processor) handleServicesAndWorkloads(services []*workloadapi.Service, } for _, workload := range workloads { - // TODO: Kmesh supports ServiceEntry if workload.GetAddresses() == nil { - log.Warnf("workload: %s/%s addresses is nil", workload.Namespace, workload.Name) - continue + if p.DnsResolverChan == nil { + log.Warnf("workload %s/%s has nil addresses but DNS resolver is disabled", workload.Namespace, workload.Name) + continue + } + + uid := workload.GetUid() + p.ResolvedDomainChanMap[uid] = make(chan *workloadapi.Workload) + p.DnsResolverChan <- workload + log.Infof("waiting for DNS resolution: %s/%s/%s", workload.Namespace, workload.Name, uid) + + select { + case <-time.After(dnsResolveTimeout): + log.Warnf("DNS resolution timeout for workload %s/%s/%s, skip handling", workload.Namespace, workload.Name, uid) + if ch, ok := p.ResolvedDomainChanMap[uid]; ok { + close(ch) + delete(p.ResolvedDomainChanMap, uid) + } + continue + case newWorkload := <-p.ResolvedDomainChanMap[uid]: + if newWorkload == nil || newWorkload.GetAddresses() == nil { + log.Warnf("workload %s/%s resolved addresses is nil, skip handling", workload.Namespace, workload.Name) + continue + } + log.Infof("workload %s/%s addresses resolved: %v", newWorkload.Namespace, newWorkload.Name, newWorkload.Addresses) + workload = newWorkload + } } if err := p.handleWorkload(workload); err != nil { @@ -988,11 +1023,11 @@ func (p *Processor) handleRemovedAddressesDuringRestart() { func (p *Processor) handleAuthorizationTypeResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) error { if rbac == nil { - return fmt.Errorf("Rbac module uninitialized") + return fmt.Errorf("rbac module uninitialized") } // update resource for _, resource := range rsp.GetResources() { - authPolicy := &security.Authorization{} + authPolicy := &security_v2.Authorization{} if err := anypb.UnmarshalTo(resource.Resource, authPolicy, proto.UnmarshalOptions{}); err != nil { log.Errorf("unmarshal failed, err: %v", err) continue diff --git a/pkg/controller/workload/workload_processor_test.go b/pkg/controller/workload/workload_processor_test.go index 4aac25b5a..cdabb7121 100644 --- a/pkg/controller/workload/workload_processor_test.go +++ b/pkg/controller/workload/workload_processor_test.go @@ -359,7 +359,8 @@ func BenchmarkAddNewServicesWithWorkload(b *testing.B) { cleanup, bpfLoader := test.InitBpfMap(t, config) b.Cleanup(cleanup) - workloadController := NewController(bpfLoader.GetBpfWorkload(), false, false) + workloadController, err := NewController(bpfLoader.GetBpfWorkload(), false, false) + assert.NoError(t, err) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/test/e2e/baseline_test.go b/test/e2e/baseline_test.go index 5b5b2431b..ad5580b14 100644 --- a/test/e2e/baseline_test.go +++ b/test/e2e/baseline_test.go @@ -236,6 +236,72 @@ spec: }) } +// TestServiceEntryDNSResolution tests accessing a service through a fake hostname configured in ServiceEntry. +// This simulates external service access using DNS resolution. +func TestServiceEntryDNSResolution(t *testing.T) { + runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { + if opt.Scheme != scheme.HTTP { + return + } + if !dst.Config().HasServiceAddressedWaypointProxy() { + return + } + + // Skip this test in IPv6-only environment as DNS proxy is disabled for IPv6 + // (see test/e2e/run_test.sh: "skip dns proxy for ipv6") + // Without DNS proxy, fake hostname resolution won't work + v4, v6 := getSupportedIPFamilies(t) + if !v4 && v6 { + t.Skip("Skipping DNS resolution test in IPv6-only environment (DNS proxy disabled)") + return + } + + const ( + serviceEntryName = "external-svc-dns" + fakeHostname = "foo.bar.com" + ) + + // Use enrolled-to-kmesh service as the actual backend for ServiceEntry + backendService := apps.EnrolledToKmesh[0].Config().Service + "." + apps.Namespace.Name() + ".svc.cluster.local" + servicePort := dst.Config().Ports.MustForName("http").ServicePort + + // Create ServiceEntry with fake hostname pointing to enrolled-to-kmesh service via DNS resolution + t.ConfigIstio().Eval(apps.Namespace.Name(), map[string]string{ + "ServiceEntryName": serviceEntryName, + "FakeHostname": fakeHostname, + "Backend": backendService, + "ServicePort": fmt.Sprintf("%d", servicePort), + }, `apiVersion: networking.istio.io/v1alpha3 +kind: ServiceEntry +metadata: + name: "{{.ServiceEntryName}}" +spec: + hosts: + - "{{.FakeHostname}}" + location: MESH_EXTERNAL + ports: + - number: {{.ServicePort}} + name: http + protocol: HTTP + resolution: DNS + endpoints: + - address: "{{.Backend}}" +`).ApplyOrFail(t) + + // Set waypoint for the ServiceEntry + SetWaypoint(t, apps.Namespace.Name(), serviceEntryName, "waypoint", ServiceEntry) + + // Test accessing the backend service through the fake hostname + opt = opt.DeepCopy() + opt.Count = 5 + opt.Timeout = time.Second * 10 + opt.Address = fakeHostname + opt.Port = echo.Port{ServicePort: servicePort} + opt.Check = check.OK() + src.CallOrFail(t, opt) + }) +} + func TestTrafficSplit(t *testing.T) { runTest(t, func(t framework.TestContext, src echo.Instance, dst echo.Instance, opt echo.CallOptions) { // Need at least one waypoint proxy and HTTP