diff --git a/internal/xds/resolver/watch_service_test.go b/internal/xds/resolver/watch_service_test.go index 1c68eba32013..6eec8a2b7429 100644 --- a/internal/xds/resolver/watch_service_test.go +++ b/internal/xds/resolver/watch_service_test.go @@ -68,10 +68,14 @@ func (s) TestServiceWatch_ListenerPointsToNewRouteConfiguration(t *testing.T) { verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(resources.Clusters[0].Name)) // Update the listener resource to point to a new route configuration name. - // Leave the old route configuration resource unchanged. + // The old route configuration resource is left unchanged to prevent a race: + // if it were removed immediately, the resolver might encounter a + // "resource-not-found" error for the old route before the listener update + // successfully transitions the client to the new route. newTestRouteConfigName := defaultTestRouteConfigName + "-new" resources.Listeners = []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, newTestRouteConfigName)} - configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes) + resources.SkipValidation = true + mgmtServer.Update(ctx, resources) // Verify that the new route configuration resource is requested. waitForResourceNames(ctx, t, routeCfgCh, []string{newTestRouteConfigName}) @@ -88,9 +92,11 @@ func (s) TestServiceWatch_ListenerPointsToNewRouteConfiguration(t *testing.T) { }, }, }) - configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, resources.Listeners, resources.Routes) + mgmtServer.Update(ctx, resources) - // Wait for no update from the resolver. + // Wait for no update from the resolver since the listener resource no + // longer points to the old route resource and new route resource hasn't + // been sent yet. verifyNoUpdateFromResolver(ctx, t, stateCh) // Update the management server with the new route configuration resource. diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 44b7dd4b7b54..a38b292b006d 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -230,8 +230,12 @@ type xdsResolver struct { curConfigSelector stoppableConfigSelector } -// ResolveNow is a no-op at this point. -func (*xdsResolver) ResolveNow(resolver.ResolveNowOptions) {} +// ResolveNow calls RequestDNSReresolution on the dependency manager. +func (r *xdsResolver) ResolveNow(opts resolver.ResolveNowOptions) { + if r.dm != nil { + r.dm.RequestDNSReresolution(opts) + } +} func (r *xdsResolver) Close() { // Cancel the context passed to the serializer and wait for any scheduled diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index 16d6fd76b1f9..b88e7ae4dc8f 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -436,14 +436,17 @@ func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) - // Configure good listener and route configuration resources on the - // management server. - listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} - routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} - configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + // Configure all resources on the management server. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + }) + mgmtServer.Update(ctx, resources) // Expect a good update from the resolver. - cs := verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(defaultTestClusterName)) + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(resources.Clusters[0].Name)) // "Make an RPC" by invoking the config selector. _, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) @@ -1001,7 +1004,9 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { Port: defaultTestPort[0], SecLevel: e2e.SecurityLevelNone, }) - mgmtServer.Update(ctx, resources) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) diff --git a/internal/xds/xdsdepmgr/watch_service.go b/internal/xds/xdsdepmgr/watch_service.go deleted file mode 100644 index 73f1b703519f..000000000000 --- a/internal/xds/xdsdepmgr/watch_service.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2025 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xdsdepmgr - -import "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" - -type listenerWatcher struct { - resourceName string - cancel func() - depMgr *DependencyManager -} - -func newListenerWatcher(resourceName string, depMgr *DependencyManager) *listenerWatcher { - lw := &listenerWatcher{resourceName: resourceName, depMgr: depMgr} - lw.cancel = xdsresource.WatchListener(depMgr.xdsClient, resourceName, lw) - return lw -} - -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { - l.depMgr.onListenerResourceUpdate(update, onDone) -} - -func (l *listenerWatcher) ResourceError(err error, onDone func()) { - l.depMgr.onListenerResourceError(err, onDone) -} - -func (l *listenerWatcher) AmbientError(err error, onDone func()) { - l.depMgr.onListenerResourceAmbientError(err, onDone) -} - -func (l *listenerWatcher) stop() { - l.cancel() - if l.depMgr.logger.V(2) { - l.depMgr.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) - } -} - -type routeConfigWatcher struct { - resourceName string - cancel func() - depMgr *DependencyManager -} - -func newRouteConfigWatcher(resourceName string, depMgr *DependencyManager) *routeConfigWatcher { - rw := &routeConfigWatcher{resourceName: resourceName, depMgr: depMgr} - rw.cancel = xdsresource.WatchRouteConfig(depMgr.xdsClient, resourceName, rw) - return rw -} - -func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) { - r.depMgr.onRouteConfigResourceUpdate(r.resourceName, u, onDone) -} - -func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { - r.depMgr.onRouteConfigResourceError(r.resourceName, err, onDone) -} - -func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { - r.depMgr.onRouteConfigResourceAmbientError(r.resourceName, err, onDone) -} - -func (r *routeConfigWatcher) stop() { - r.cancel() - if r.depMgr.logger.V(2) { - r.depMgr.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) - } -} diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go index fae5ca962d42..266516bd27d8 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager.go @@ -19,19 +19,28 @@ package xdsdepmgr import ( + "context" "fmt" + "net/url" "sync" "google.golang.org/grpc/grpclog" internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) const prefix = "[xdsdepmgr %p] " var logger = grpclog.Component("xds") +// EnableClusterAndEndpointsWatch is a flag used to control whether the CDS/EDS +// watchers in the dependency manager should be used. +var EnableClusterAndEndpointsWatch = false + func prefixLogger(p *DependencyManager) *internalgrpclog.PrefixLogger { return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) } @@ -56,6 +65,40 @@ type ConfigWatcher interface { Error(error) } +// xdsResourceState is a generic struct to hold the state of a watched xDS +// resource. +type xdsResourceState[T any, U any] struct { + lastUpdate *T + lastErr error + updateReceived bool + stop func() + extras U // to store any additional state specific to the watcher +} + +func (x *xdsResourceState[T, U]) setLastUpdate(update *T) { + x.lastUpdate = update + x.updateReceived = true + x.lastErr = nil +} + +func (x *xdsResourceState[T, U]) setLastError(err error) { + x.lastErr = err + x.updateReceived = true + x.lastUpdate = nil +} + +func (x *xdsResourceState[T, U]) updateLastError(err error) { + x.lastErr = err +} + +type dnsExtras struct { + dnsR resolver.Resolver +} + +type routeExtras struct { + virtualHost *xdsresource.VirtualHost +} + // DependencyManager registers watches on the xDS client for all required xDS // resources, resolves dependencies between them, and returns a complete // configuration to the xDS resolver. @@ -69,16 +112,23 @@ type DependencyManager struct { dataplaneAuthority string nodeID string - // All the fields below are protected by mu. - mu sync.Mutex - stopped bool + // Used to serialize callbacks from DNS resolvers to avoid deadlocks. Since + // the resolver's Build() is called with the dependency manager lock held, + // direct callbacks to ClientConn (which also require that lock) would + // deadlock. + dnsSerializer *grpcsync.CallbackSerializer + dnsSerializerCancel func() - listenerWatcher *listenerWatcher - currentListenerUpdate *xdsresource.ListenerUpdate - routeConfigWatcher *routeConfigWatcher - rdsResourceName string - currentRouteConfig *xdsresource.RouteConfigUpdate - currentVirtualHost *xdsresource.VirtualHost + // All the fields below are protected by mu. + mu sync.Mutex + stopped bool + listenerWatcher *xdsResourceState[xdsresource.ListenerUpdate, struct{}] + rdsResourceName string + routeConfigWatcher *xdsResourceState[xdsresource.RouteConfigUpdate, routeExtras] + clustersFromRouteConfig map[string]bool + clusterWatchers map[string]*xdsResourceState[xdsresource.ClusterUpdate, struct{}] + endpointWatchers map[string]*xdsResourceState[xdsresource.EndpointsUpdate, struct{}] + dnsResolvers map[string]*xdsResourceState[xdsresource.DNSUpdate, dnsExtras] } // New creates a new DependencyManager. @@ -91,18 +141,37 @@ type DependencyManager struct { // - watcher is the ConfigWatcher interface that will receive the aggregated // XDSConfig updates and errors. func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, watcher ConfigWatcher) *DependencyManager { + ctx, cancel := context.WithCancel(context.Background()) dm := &DependencyManager{ - ldsResourceName: listenerName, - dataplaneAuthority: dataplaneAuthority, - xdsClient: xdsClient, - watcher: watcher, - nodeID: xdsClient.BootstrapConfig().Node().GetId(), + ldsResourceName: listenerName, + dataplaneAuthority: dataplaneAuthority, + xdsClient: xdsClient, + watcher: watcher, + nodeID: xdsClient.BootstrapConfig().Node().GetId(), + dnsSerializer: grpcsync.NewCallbackSerializer(ctx), + dnsSerializerCancel: cancel, + clustersFromRouteConfig: make(map[string]bool), + endpointWatchers: make(map[string]*xdsResourceState[xdsresource.EndpointsUpdate, struct{}]), + dnsResolvers: make(map[string]*xdsResourceState[xdsresource.DNSUpdate, dnsExtras]), + clusterWatchers: make(map[string]*xdsResourceState[xdsresource.ClusterUpdate, struct{}]), } dm.logger = prefixLogger(dm) // Start the listener watch. Listener watch will start the other resource // watches as needed. - dm.listenerWatcher = newListenerWatcher(listenerName, dm) + dm.listenerWatcher = &xdsResourceState[xdsresource.ListenerUpdate, struct{}]{} + lw := &xdsResourceWatcher[xdsresource.ListenerUpdate]{ + onUpdate: func(update *xdsresource.ListenerUpdate, onDone func()) { + dm.onListenerResourceUpdate(update, onDone) + }, + onError: func(err error, onDone func()) { + dm.onListenerResourceError(err, onDone) + }, + onAmbientError: func(err error, onDone func()) { + dm.onListenerResourceAmbientError(err, onDone) + }, + } + dm.listenerWatcher.stop = xdsresource.WatchListener(dm.xdsClient, listenerName, lw) return dm } @@ -116,12 +185,27 @@ func (m *DependencyManager) Close() { } m.stopped = true - if m.listenerWatcher != nil { - m.listenerWatcher.stop() - } + m.listenerWatcher.stop() if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() } + for name, cluster := range m.clusterWatchers { + cluster.stop() + delete(m.clusterWatchers, name) + } + + for name, endpoint := range m.endpointWatchers { + endpoint.stop() + delete(m.endpointWatchers, name) + } + + // We cannot wait for the dns serializer to finish here, as the callbacks + // try to grab the dependency manager lock, which is already held here. + m.dnsSerializerCancel() + for name, dnsResolver := range m.dnsResolvers { + dnsResolver.stop() + delete(m.dnsResolvers, name) + } } // annotateErrorWithNodeID annotates the given error with the provided xDS node @@ -130,25 +214,263 @@ func (m *DependencyManager) annotateErrorWithNodeID(err error) error { return fmt.Errorf("[xDS node id: %v]: %v", m.nodeID, err) } -// maybeSendUpdateLocked checks that all the resources have been received and sends -// the current aggregated xDS configuration to the watcher if all the updates -// are available. +// maybeSendUpdateLocked verifies that all expected resources have been +// received, and if so, delivers the complete xDS configuration to the watcher. func (m *DependencyManager) maybeSendUpdateLocked() { - m.watcher.Update(&xdsresource.XDSConfig{ - Listener: m.currentListenerUpdate, - RouteConfig: m.currentRouteConfig, - VirtualHost: m.currentVirtualHost, - }) + if m.listenerWatcher.lastUpdate == nil || m.routeConfigWatcher == nil || m.routeConfigWatcher.lastUpdate == nil { + return + } + config := &xdsresource.XDSConfig{ + Listener: m.listenerWatcher.lastUpdate, + RouteConfig: m.routeConfigWatcher.lastUpdate, + VirtualHost: m.routeConfigWatcher.extras.virtualHost, + Clusters: make(map[string]*xdsresource.ClusterResult), + } + + if !EnableClusterAndEndpointsWatch { + m.watcher.Update(config) + return + } + + edsResourcesSeen := make(map[string]bool) + dnsResourcesSeen := make(map[string]bool) + clusterResourcesSeen := make(map[string]bool) + haveAllResources := true + for cluster := range m.clustersFromRouteConfig { + ok, leafClusters, err := m.populateClusterConfigLocked(cluster, 0, config.Clusters, edsResourcesSeen, dnsResourcesSeen, clusterResourcesSeen) + if !ok { + haveAllResources = false + } + // If there are no leaf clusters, add that as error. + if ok && len(leafClusters) == 0 { + config.Clusters[cluster] = &xdsresource.ClusterResult{Err: m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters"))} + } + if err != nil { + config.Clusters[cluster] = &xdsresource.ClusterResult{Err: err} + } + } + + // Cancel resources not seen in the tree. + for name, ep := range m.endpointWatchers { + if _, ok := edsResourcesSeen[name]; !ok { + ep.stop() + delete(m.endpointWatchers, name) + } + } + for name, dr := range m.dnsResolvers { + if _, ok := dnsResourcesSeen[name]; !ok { + dr.stop() + delete(m.dnsResolvers, name) + } + } + for name, cluster := range m.clusterWatchers { + if _, ok := clusterResourcesSeen[name]; !ok { + cluster.stop() + delete(m.clusterWatchers, name) + } + } + if haveAllResources { + m.watcher.Update(config) + } +} + +// populateClusterConfigLocked resolves and populates the +// configuration for the given cluster and its children, including its +// associated endpoint or aggregate children. For aggregate clusters, it +// recursively resolves the configuration for its child clusters. +// +// This function traverses the cluster dependency graph (e.g., from an Aggregate +// cluster down to its leaf clusters and their endpoints/DNS resources) to +// ensure all necessary xDS resources are watched and fully resolved before +// configuration is considered ready. +// +// Parameters: +// +// clusterName: The name of the cluster resource to resolve. +// depth: The current recursion depth. +// clusterConfigs: Map to store the resolved cluster configuration. +// endpointResourcesSeen: Stores which EDS resource names have been encountered. +// dnsResourcesSeen: Stores which DNS resource names have been encountered. +// clustersSeen: Stores which cluster resource names have been encountered. +// +// Returns: +// +// bool: Returns true if the cluster configuration (and all its +// dependencies) is fully resolved (i.e either update or +// error has been received). +// []string: A slice of all "leaf" cluster names discovered in the +// traversal starting from `clusterName`. For +// non-aggregate clusters, this will contain only `clusterName`. +// error: Error that needs to be propogated up the tree (like +// max depth exceeded or an error propagated from a +// child cluster). +func (m *DependencyManager) populateClusterConfigLocked(clusterName string, depth int, clusterConfigs map[string]*xdsresource.ClusterResult, endpointResourcesSeen, dnsResourcesSeen, clustersSeen map[string]bool) (bool, []string, error) { + const aggregateClusterMaxDepth = 16 + clustersSeen[clusterName] = true + + if depth >= aggregateClusterMaxDepth { + err := m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", aggregateClusterMaxDepth)) + clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: err} + return true, nil, err + } + + // If cluster is already seen in the tree, return. + if _, ok := clusterConfigs[clusterName]; ok { + return true, nil, nil + } + + // If cluster watcher does not exist, create one. + state, ok := m.clusterWatchers[clusterName] + if !ok { + m.clusterWatchers[clusterName] = newClusterWatcher(clusterName, m) + return false, nil, nil + } + + // If a watch exists but no update received yet, return. + if !state.updateReceived { + return false, nil, nil + } + + // If there was a resource error, propagate it up. + if state.lastErr != nil { + return true, nil, state.lastErr + } + + clusterConfigs[clusterName] = &xdsresource.ClusterResult{ + Config: xdsresource.ClusterConfig{ + Cluster: state.lastUpdate, + }, + } + update := state.lastUpdate + + switch update.ClusterType { + case xdsresource.ClusterTypeEDS: + return m.populateEDSClusterLocked(clusterName, update, clusterConfigs, endpointResourcesSeen) + case xdsresource.ClusterTypeLogicalDNS: + return m.populateLogicalDNSClusterLocked(clusterName, update, clusterConfigs, dnsResourcesSeen) + case xdsresource.ClusterTypeAggregate: + return m.populateAggregateClusterLocked(clusterName, update, depth, clusterConfigs, endpointResourcesSeen, dnsResourcesSeen, clustersSeen) + default: + clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: m.annotateErrorWithNodeID(fmt.Errorf("cluster type %v of cluster %s not supported", update.ClusterType, clusterName))} + return true, nil, nil + } +} + +func (m *DependencyManager) populateEDSClusterLocked(clusterName string, update *xdsresource.ClusterUpdate, clusterConfigs map[string]*xdsresource.ClusterResult, endpointResourcesSeen map[string]bool) (bool, []string, error) { + edsName := clusterName + if update.EDSServiceName != "" { + edsName = update.EDSServiceName + } + endpointResourcesSeen[edsName] = true + + // If endpoint watcher does not exist, create one. + if _, ok := m.endpointWatchers[edsName]; !ok { + m.endpointWatchers[edsName] = newEndpointWatcher(edsName, m) + return false, nil, nil + } + endpointState := m.endpointWatchers[edsName] + + // If the resource does not have any update yet, return. + if !endpointState.updateReceived { + return false, nil, nil + } + + // Store the update and error. + clusterConfigs[clusterName].Config.EndpointConfig = &xdsresource.EndpointConfig{ + EDSUpdate: endpointState.lastUpdate, + ResolutionNote: endpointState.lastErr, + } + return true, []string{clusterName}, nil +} + +func (m *DependencyManager) populateLogicalDNSClusterLocked(clusterName string, update *xdsresource.ClusterUpdate, clusterConfigs map[string]*xdsresource.ClusterResult, dnsResourcesSeen map[string]bool) (bool, []string, error) { + target := update.DNSHostName + dnsResourcesSeen[target] = true + + // If dns resolver does not exist, create one. + if _, ok := m.dnsResolvers[target]; !ok { + state := m.newDNSResolver(target) + if state == nil { + return false, nil, nil + } + m.dnsResolvers[target] = state + return false, nil, nil + } + dnsState := m.dnsResolvers[target] + + // If no update received, return false. + if !dnsState.updateReceived { + return false, nil, nil + } + + clusterConfigs[clusterName].Config.EndpointConfig = &xdsresource.EndpointConfig{ + DNSEndpoints: dnsState.lastUpdate, + ResolutionNote: dnsState.lastErr, + } + return true, []string{clusterName}, nil +} + +func (m *DependencyManager) populateAggregateClusterLocked(clusterName string, update *xdsresource.ClusterUpdate, depth int, clusterConfigs map[string]*xdsresource.ClusterResult, endpointResourcesSeen, dnsResourcesSeen, clustersSeen map[string]bool) (bool, []string, error) { + var leafClusters []string + haveAllResources := true + for _, child := range update.PrioritizedClusterNames { + ok, childLeafClusters, err := m.populateClusterConfigLocked(child, depth+1, clusterConfigs, endpointResourcesSeen, dnsResourcesSeen, clustersSeen) + if !ok { + haveAllResources = false + } + if err != nil { + clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: err} + return true, leafClusters, err + } + leafClusters = append(leafClusters, childLeafClusters...) + } + if !haveAllResources { + return false, leafClusters, nil + } + if haveAllResources && len(leafClusters) == 0 { + clusterConfigs[clusterName] = &xdsresource.ClusterResult{Err: m.annotateErrorWithNodeID(fmt.Errorf("aggregate cluster graph has no leaf clusters"))} + return true, leafClusters, nil + } + clusterConfigs[clusterName].Config.AggregateConfig = &xdsresource.AggregateConfig{ + LeafClusters: leafClusters, + } + return true, leafClusters, nil } func (m *DependencyManager) applyRouteConfigUpdateLocked(update *xdsresource.RouteConfigUpdate) { matchVH := xdsresource.FindBestMatchingVirtualHost(m.dataplaneAuthority, update.VirtualHosts) if matchVH == nil { - m.watcher.Error(m.annotateErrorWithNodeID(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority))) + err := m.annotateErrorWithNodeID(fmt.Errorf("could not find VirtualHost for %q", m.dataplaneAuthority)) + m.routeConfigWatcher.setLastError(err) + m.watcher.Error(err) return } - m.currentRouteConfig = update - m.currentVirtualHost = matchVH + m.routeConfigWatcher.setLastUpdate(update) + m.routeConfigWatcher.extras.virtualHost = matchVH + + if EnableClusterAndEndpointsWatch { + // Get the clusters to be watched from the routes in the virtual host. + // If the CLusterSpecifierField is set, we ignore it for now as the + // clusters will be determined dynamically for it. + newClusters := make(map[string]bool) + + for _, rt := range matchVH.Routes { + for _, cluster := range rt.WeightedClusters { + newClusters[cluster.Name] = true + } + } + // Cancel watch for clusters not seen in route config + for name := range m.clustersFromRouteConfig { + if _, ok := newClusters[name]; !ok { + m.clusterWatchers[name].stop() + delete(m.clusterWatchers, name) + } + } + + // Watch for new clusters is started in populateClusterConfigLocked to + // avoid repeating the code. + m.clustersFromRouteConfig = newClusters + } m.maybeSendUpdateLocked() } @@ -165,7 +487,7 @@ func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.Listene m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) } - m.currentListenerUpdate = update + m.listenerWatcher.setLastUpdate(update) if update.InlineRouteConfig != nil { // If there was a previous route config watcher because of a non-inline @@ -173,8 +495,8 @@ func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.Listene m.rdsResourceName = "" if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() - m.routeConfigWatcher = nil } + m.routeConfigWatcher = &xdsResourceState[xdsresource.RouteConfigUpdate, routeExtras]{stop: func() {}} m.applyRouteConfigUpdateLocked(update.InlineRouteConfig) return } @@ -194,9 +516,25 @@ func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.Listene m.rdsResourceName = update.RouteConfigName if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() - m.currentVirtualHost = nil } - m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) + rw := &xdsResourceWatcher[xdsresource.RouteConfigUpdate]{ + onUpdate: func(update *xdsresource.RouteConfigUpdate, onDone func()) { + m.onRouteConfigResourceUpdate(m.rdsResourceName, update, onDone) + }, + onError: func(err error, onDone func()) { + m.onRouteConfigResourceError(m.rdsResourceName, err, onDone) + }, + onAmbientError: func(err error, onDone func()) { + m.onRouteConfigResourceAmbientError(m.rdsResourceName, err, onDone) + }, + } + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop = xdsresource.WatchRouteConfig(m.xdsClient, m.rdsResourceName, rw) + } else { + m.routeConfigWatcher = &xdsResourceState[xdsresource.RouteConfigUpdate, routeExtras]{ + stop: xdsresource.WatchRouteConfig(m.xdsClient, m.rdsResourceName, rw), + } + } } func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { @@ -213,8 +551,8 @@ func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() } + m.listenerWatcher.setLastError(err) m.rdsResourceName = "" - m.currentVirtualHost = nil m.routeConfigWatcher = nil m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) } @@ -258,6 +596,7 @@ func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err if m.stopped || m.rdsResourceName != resourceName { return } + m.routeConfigWatcher.setLastError(err) m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) } @@ -277,3 +616,268 @@ func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName strin m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) } + +func newClusterWatcher(resourceName string, depMgr *DependencyManager) *xdsResourceState[xdsresource.ClusterUpdate, struct{}] { + w := &xdsResourceWatcher[xdsresource.ClusterUpdate]{ + onUpdate: func(u *xdsresource.ClusterUpdate, onDone func()) { + depMgr.onClusterResourceUpdate(resourceName, u, onDone) + }, + onError: func(err error, onDone func()) { + depMgr.onClusterResourceError(resourceName, err, onDone) + }, + onAmbientError: func(err error, onDone func()) { + depMgr.onClusterAmbientError(resourceName, err, onDone) + }, + } + return &xdsResourceState[xdsresource.ClusterUpdate, struct{}]{ + stop: xdsresource.WatchCluster(depMgr.xdsClient, resourceName, w), + } +} + +// Records a successful Cluster resource update, clears any previous error. +func (m *DependencyManager) onClusterResourceUpdate(resourceName string, update *xdsresource.ClusterUpdate, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.clusterWatchers[resourceName] == nil { + return + } + + if m.logger.V(2) { + m.logger.Infof("Received update for Cluster resource %q: %+v", resourceName, update) + } + m.clusterWatchers[resourceName].setLastUpdate(update) + m.maybeSendUpdateLocked() +} + +// Records a resource error for a Cluster resource, clears the last successful +// update since we want to stop using the resource if we get a resource error. +func (m *DependencyManager) onClusterResourceError(resourceName string, err error, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.clusterWatchers[resourceName] == nil { + return + } + m.logger.Warningf("Received resource error for Cluster resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.clusterWatchers[resourceName].setLastError(err) + m.maybeSendUpdateLocked() +} + +// Records the error in the state. The last successful update is retained +// because it should continue to be used as an amnbient error is received. +func (m *DependencyManager) onClusterAmbientError(resourceName string, err error, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.clusterWatchers[resourceName] == nil { + return + } + m.logger.Warningf("Cluster resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) +} + +func newEndpointWatcher(resourceName string, depMgr *DependencyManager) *xdsResourceState[xdsresource.EndpointsUpdate, struct{}] { + w := &xdsResourceWatcher[xdsresource.EndpointsUpdate]{ + onUpdate: func(u *xdsresource.EndpointsUpdate, onDone func()) { + depMgr.onEndpointUpdate(resourceName, u, onDone) + }, + onError: func(err error, onDone func()) { + depMgr.onEndpointResourceError(resourceName, err, onDone) + }, + onAmbientError: func(err error, onDone func()) { + depMgr.onEndpointAmbientError(resourceName, err, onDone) + }, + } + return &xdsResourceState[xdsresource.EndpointsUpdate, struct{}]{ + stop: xdsresource.WatchEndpoints(depMgr.xdsClient, resourceName, w), + } +} + +// Records a successful Endpoint resource update, clears any previous error from +// the state. +func (m *DependencyManager) onEndpointUpdate(resourceName string, update *xdsresource.EndpointsUpdate, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.endpointWatchers[resourceName] == nil { + return + } + + if m.logger.V(2) { + m.logger.Infof("Received update for Endpoint resource %q: %+v", resourceName, update) + } + m.endpointWatchers[resourceName].setLastUpdate(update) + m.maybeSendUpdateLocked() +} + +// Records a resource error and clears the last successful update since the +// endpoints should not be used after getting a resource error. +func (m *DependencyManager) onEndpointResourceError(resourceName string, err error, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.endpointWatchers[resourceName] == nil { + return + } + m.logger.Warningf("Received resource error for Endpoint resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.endpointWatchers[resourceName].setLastError(err) + m.maybeSendUpdateLocked() +} + +// Records the ambient error without clearing the last successful update, as the +// endpoints should continue to be used. +func (m *DependencyManager) onEndpointAmbientError(resourceName string, err error, onDone func()) { + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.endpointWatchers[resourceName] == nil { + return + } + + m.logger.Warningf("Endpoint resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.endpointWatchers[resourceName].updateLastError(err) + m.maybeSendUpdateLocked() +} + +// Converts the DNS resolver state to an internal update, handling address-only +// updates by wrapping them into endpoints. It records the update and clears any +// previous error. +func (m *DependencyManager) onDNSUpdate(resourceName string, update *resolver.State) { + m.mu.Lock() + defer m.mu.Unlock() + if m.stopped || m.dnsResolvers[resourceName] == nil { + return + } + + if m.logger.V(2) { + m.logger.Infof("Received update from DNS resolver for resource %q: %+v", resourceName, update) + } + var endpoints []resolver.Endpoint + if len(update.Endpoints) == 0 { + endpoints = make([]resolver.Endpoint, len(update.Addresses)) + for i, a := range update.Addresses { + endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{a}} + endpoints[i].Attributes = a.BalancerAttributes + } + } + + m.dnsResolvers[resourceName].setLastUpdate(&xdsresource.DNSUpdate{Endpoints: endpoints}) + m.maybeSendUpdateLocked() +} + +// Records a DNS resolver error. It clears the last update only if no successful +// update has been received yet, then triggers a dependency update. +// +// If a previous good update was received, the error is recorded but the +// previous update is retained for continued use. Errors are suppressed if a +// resource error was already received, as further propagation would have no +// downstream effect. +func (m *DependencyManager) onDNSError(resourceName string, err error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.stopped || m.dnsResolvers[resourceName] == nil { + return + } + + err = fmt.Errorf("dns resolver error for target %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.logger.Warningf("%v", err) + state := m.dnsResolvers[resourceName] + if state.updateReceived { + state.updateLastError(err) + return + } + + state.setLastError(err) + m.maybeSendUpdateLocked() +} + +// RequestDNSReresolution calls all the the DNS resolver's ResolveNow. +func (m *DependencyManager) RequestDNSReresolution(opt resolver.ResolveNowOptions) { + m.mu.Lock() + defer m.mu.Unlock() + for _, res := range m.dnsResolvers { + if res.extras.dnsR != nil { + res.extras.dnsR.ResolveNow(opt) + } + } +} + +type resolverClientConn struct { + target string + depMgr *DependencyManager +} + +func (rcc *resolverClientConn) UpdateState(state resolver.State) error { + rcc.depMgr.dnsSerializer.TrySchedule(func(context.Context) { + rcc.depMgr.onDNSUpdate(rcc.target, &state) + }) + return nil +} + +func (rcc *resolverClientConn) ReportError(err error) { + rcc.depMgr.dnsSerializer.TrySchedule(func(context.Context) { + rcc.depMgr.onDNSError(rcc.target, err) + }) +} + +func (rcc *resolverClientConn) NewAddress(addresses []resolver.Address) { + rcc.UpdateState(resolver.State{Addresses: addresses}) +} + +func (rcc *resolverClientConn) ParseServiceConfig(string) *serviceconfig.ParseResult { + return &serviceconfig.ParseResult{Err: fmt.Errorf("service config not supported")} +} + +func (m *DependencyManager) newDNSResolver(target string) *xdsResourceState[xdsresource.DNSUpdate, dnsExtras] { + rcc := &resolverClientConn{ + target: target, + depMgr: m, + } + u, err := url.Parse("dns:///" + target) + if err != nil { + err := fmt.Errorf("failed to parse DNS target %q: %v", target, m.annotateErrorWithNodeID(err)) + m.logger.Warningf("%v", err) + rcc.ReportError(err) + return &xdsResourceState[xdsresource.DNSUpdate, dnsExtras]{} + } + + r, err := resolver.Get("dns").Build(resolver.Target{URL: *u}, rcc, resolver.BuildOptions{}) + if err != nil { + rcc.ReportError(err) + err := fmt.Errorf("failed to build DNS resolver for target %q: %v", target, m.annotateErrorWithNodeID(err)) + m.logger.Warningf("%v", err) + return nil + } + + return &xdsResourceState[xdsresource.DNSUpdate, dnsExtras]{ + extras: dnsExtras{dnsR: r}, + stop: r.Close, + } +} + +// xdsResourceWatcher is a generic implementation of the xdsresource.Watcher +// interface. +type xdsResourceWatcher[T any] struct { + onUpdate func(*T, func()) + onError func(error, func()) + onAmbientError func(error, func()) +} + +func (x *xdsResourceWatcher[T]) ResourceChanged(update *T, onDone func()) { + x.onUpdate(update, onDone) +} + +func (x *xdsResourceWatcher[T]) ResourceError(err error, onDone func()) { + x.onError(err, onDone) +} + +func (x *xdsResourceWatcher[T]) AmbientError(err error, onDone func()) { + x.onAmbientError(err, onDone) +} diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go index 009010877adf..8cfd1aa924ec 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager_test.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager_test.go @@ -21,10 +21,12 @@ package xdsdepmgr_test import ( "context" "fmt" + "regexp" "strings" "testing" "time" + xxhash "github.com/cespare/xxhash/v2" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -33,10 +35,16 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/internal/xds/clients" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" "google.golang.org/grpc/internal/xds/xdsdepmgr" + "google.golang.org/grpc/resolver" + "google.golang.org/protobuf/types/known/wrapperspb" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" @@ -50,6 +58,7 @@ type s struct { } func Test(t *testing.T) { + xdsdepmgr.EnableClusterAndEndpointsWatch = true grpctest.RunSubTests(t, s{}) } @@ -60,6 +69,7 @@ const ( defaultTestServiceName = "service-name" defaultTestRouteConfigName = "route-config-name" defaultTestClusterName = "cluster-name" + defaultTestEDSServiceName = "eds-service-name" ) func newStringP(s string) *string { @@ -72,11 +82,20 @@ func newStringP(s string) *string { type testWatcher struct { updateCh chan *xdsresource.XDSConfig errorCh chan error + done chan struct{} } -// Update sends the received XDSConfig update to the update channel. +// Update sends the received XDSConfig update to the update channel. Does not +// send updates if the done channel is closed. The done channel is closed in the +// cases of errors because management server keeps sending error updates that +// cases multiple updates to be sent from dependency manager causing the update +// channel to be blocked. func (w *testWatcher) Update(cfg *xdsresource.XDSConfig) { - w.updateCh <- cfg + select { + case <-w.done: + return + case w.updateCh <- cfg: + } } // Error sends the received error to the error channel. @@ -102,6 +121,25 @@ func verifyError(ctx context.Context, errCh chan error, wantErr, wantNodeID stri return nil } +// This function determines the stable, canonical order for any two +// resolver.Endpoint structs. +func lessEndpoint(a, b resolver.Endpoint) bool { + return getHash(a) < getHash(b) +} + +func getHash(e resolver.Endpoint) uint64 { + h := xxhash.New() + + // We iterate through all addresses to ensure the hash represents + // the full endpoint identity. + for _, addr := range e.Addresses { + h.Write([]byte(addr.Addr)) + h.Write([]byte(addr.ServerName)) + } + + return h.Sum64() +} + func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, errCh chan error, want *xdsresource.XDSConfig) error { select { case <-ctx.Done(): @@ -112,6 +150,24 @@ func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, err cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"), cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"), + cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy", "TelemetryLabels"), + cmpopts.IgnoreFields(xdsresource.EndpointsUpdate{}, "Raw"), + // Used for EndpointConfig.ResolutionNote and ClusterResult.Err fields. + cmp.Transformer("ErrorsToString", func(in error) string { + if in == nil { + return "" // Treat nil as an empty string + } + s := in.Error() + + // Replace all sequences of whitespace (including newlines and + // tabs) with a single standard space. + s = regexp.MustCompile(`\s+`).ReplaceAllString(s, " ") + + // Trim any leading/trailing space that might be left over and + // return error as string. + return strings.TrimSpace(s) + }), + cmpopts.SortSlices(lessEndpoint), } if diff := cmp.Diff(update, want, cmpOpts...); diff != "" { return fmt.Errorf("received unexpected update from dependency manager. Diff (-got +want):\n%v", diff) @@ -122,6 +178,75 @@ func verifyXDSConfig(ctx context.Context, xdsCh chan *xdsresource.XDSConfig, err return nil } +func makeXDSConfig(routeConfigName, clusterName, edsServiceName, addr string) *xdsresource.XDSConfig { + return &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: routeConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: clusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: clusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, + }, + Clusters: map[string]*xdsresource.ClusterResult{ + clusterName: { + Config: xdsresource.ClusterConfig{Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: clusterName, + EDSServiceName: edsServiceName, + }, + EndpointConfig: &xdsresource.EndpointConfig{ + EDSUpdate: &xdsresource.EndpointsUpdate{ + Localities: []xdsresource.Locality{ + {ID: clients.Locality{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: addr}}}, + HealthStatus: xdsresource.EndpointHealthStatusUnknown, + Weight: 1, + }, + }, + Weight: 1, + }, + }, + }, + }, + }, + }, + }, + } +} + +// setupManagementServerAndClient creates a management server, an xds client and +// returns the node ID, management server and xds client. +func setupManagementServerAndClient(t *testing.T, allowResourceSubset bool) (string, *e2e.ManagementServer, xdsclient.XDSClient) { + t.Helper() + nodeID := uuid.New().String() + mgmtServer, bootstrapContents := setupManagementServerForTest(t, nodeID, allowResourceSubset) + xdsClient := createXDSClient(t, bootstrapContents) + return nodeID, mgmtServer, xdsClient +} + func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient { t.Helper() @@ -142,28 +267,50 @@ func createXDSClient(t *testing.T, bootstrapContents []byte) xdsclient.XDSClient return c } -// Spins up an xDS management server and sets up the xDS bootstrap configuration. +// Spins up an xDS management server and sets up the xDS bootstrap +// configuration. // // Returns the following: // - A reference to the xDS management server // - Contents of the bootstrap configuration pointing to xDS management // server -func setupManagementServerForTest(t *testing.T, nodeID string) (*e2e.ManagementServer, []byte) { +func setupManagementServerForTest(t *testing.T, nodeID string, allowResourceSubset bool) (*e2e.ManagementServer, []byte) { t.Helper() - mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + AllowResourceSubset: allowResourceSubset, + }) t.Cleanup(mgmtServer.Stop) bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) return mgmtServer, bootstrapContents } +// makeAggregateClusterResource returns an aggregate cluster resource with the +// given name and list of child names. +func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster { + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeAggregate, + ChildNames: childNames, + }) +} + +// makeLogicalDNSClusterResource returns a LOGICAL_DNS cluster resource with the +// given name and given DNS host and port. +func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster { + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeLogicalDNS, + DNSHostName: dnsHost, + DNSPort: dnsPort, + }) +} + // Tests the happy case where the dependency manager receives all the required -// resources and verifies that Update is called with with the correct XDSConfig. +// resources and verifies that Update is called with the correct XDSConfig. func (s) TestHappyCase(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -173,46 +320,20 @@ func (s) TestHappyCase(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) defer dm.Close() - wantXdsConfig := &xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, - }, - RouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}, - }, - }, - } + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } @@ -221,9 +342,7 @@ func (s) TestHappyCase(t *testing.T) { // Tests the case where the listener contains an inline route configuration and // verifies that Update is called with the correct XDSConfig. func (s) TestInlineRouteConfig(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -249,9 +368,13 @@ func (s) TestInlineRouteConfig(t *testing.T) { }}, }}, } + cluster := e2e.DefaultCluster(defaultTestClusterName, defaultTestEDSServiceName, e2e.SecurityLevelNone) + endpoint := e2e.DefaultEndpoint(defaultTestEDSServiceName, "localhost", []uint32{8080}) resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listener}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoint}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { @@ -260,55 +383,20 @@ func (s) TestInlineRouteConfig(t *testing.T) { dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) defer dm.Close() - wantConfig := &xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - InlineRouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}, - }, - }, - }, - }, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, - }, - RouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}, - }, - }, - }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute}, - }, - }, - } - if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantConfig); err != nil { + wantXdsConfig := makeXDSConfig(defaultTestRouteConfigName, defaultTestClusterName, defaultTestEDSServiceName, "localhost:8080") + wantXdsConfig.Listener.InlineRouteConfig = wantXdsConfig.RouteConfig + wantXdsConfig.Listener.RouteConfigName = "" + + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } } // Tests the case where dependency manager only receives listener resource but -// does not receive route config resource. Verfies that Update is not called +// does not receive route config resource. Verifies that Update is not called // since we do not have all resources. -func (s) TestIncompleteResources(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) +func (s) TestNoRouteConfigResource(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -342,10 +430,8 @@ func (s) TestIncompleteResources(t *testing.T) { // Tests the case where dependency manager receives a listener resource error by // sending the correct update first and then removing the listener resource. It // verifies that Error is called with the correct error. -func (s) TestListenerResourceError(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) +func (s) TestListenerResourceNotFoundError(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -355,54 +441,27 @@ func (s) TestListenerResourceError(t *testing.T) { defer cancel() // Send a correct update first - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - listener.FilterChains = nil - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) defer dm.Close() - - wantXdsConfig := &xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, - }, - RouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - } + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } // Remove listener resource so that we get listener resource error. resources.Listeners = nil + resources.SkipValidation = true if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -412,18 +471,16 @@ func (s) TestListenerResourceError(t *testing.T) { } } -// Tests the case where dependency manager receives a route config resource -// error by sending a route resource that is NACKed by the XDSClient. It -// verifies that Error is called with correct error. -func (s) TestRouteResourceError(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) +// Tests the scenario where the Dependency Manager receives an invalid +// RouteConfiguration from the management server. The test provides a +// malformed resource to trigger a NACK, and verifies that the Dependency +// Manager propagates the resulting error via Error method. +func (s) TestRouteConfigResourceError(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) - errorCh := make(chan error, 1) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), - errorCh: errorCh, + errorCh: make(chan error), } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -451,11 +508,11 @@ func (s) TestRouteResourceError(t *testing.T) { } } -// Tests the case where route config updates receives does not have any virtual -// host. Verifies that Error is called with correct error. +// Tests the case where a received route configuration update has no virtual +// hosts. Verifies that Error is called with the expected error. func (s) TestNoVirtualHost(t *testing.T) { nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) + mgmtServer, bc := setupManagementServerForTest(t, nodeID, false) xdsClient := createXDSClient(t, bc) watcher := &testWatcher{ @@ -466,15 +523,16 @@ func (s) TestNoVirtualHost(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - route.VirtualHosts = nil - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + // Make the virtual host match nil so that the route config is NACKed. + resources.Routes[0].VirtualHosts = nil + if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -490,9 +548,7 @@ func (s) TestNoVirtualHost(t *testing.T) { // route resource with no virtual host, which also results in error being sent // across. func (s) TestNoVirtualHost_ExistingResource(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -502,15 +558,13 @@ func (s) TestNoVirtualHost_ExistingResource(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Send valid listener and route. - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -519,38 +573,13 @@ func (s) TestNoVirtualHost_ExistingResource(t *testing.T) { defer dm.Close() // Verify valid update. - wantXdsConfig := &xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, - }, - RouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - } + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Endpoints[0].ClusterName, "localhost:8080") if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } // 3. Send route update with no virtual host. - route.VirtualHosts = nil + resources.Routes[0].VirtualHosts = nil if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -571,26 +600,24 @@ func (s) TestAmbientError(t *testing.T) { // Expect a warning log for the ambient error. grpctest.ExpectWarning("Listener resource ambient error") - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), errorCh: make(chan error), } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Configure a valid listener and route. - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + // Configure a valid resources. + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -599,32 +626,7 @@ func (s) TestAmbientError(t *testing.T) { defer dm.Close() // Wait for the initial valid update. - wantXdsConfig := &xdsresource.XDSConfig{ - Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, - }, - RouteConfig: &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - }, - }, - VirtualHost: &xdsresource.VirtualHost{ - Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, - }, - } + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } @@ -648,12 +650,13 @@ func (s) TestAmbientError(t *testing.T) { }}, } resources.Listeners = []*v3listenerpb.Listener{lis} + resources.SkipValidation = true if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // We expect no call to Error or Update on our watcher. We just wait for - // a short duration to ensure that. + // We expect no call to Error or Update on our watcher. We just wait for a + // short duration to ensure that. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() select { @@ -665,14 +668,13 @@ func (s) TestAmbientError(t *testing.T) { } // Send valid resources again. - listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, - } + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -684,9 +686,7 @@ func (s) TestAmbientError(t *testing.T) { // Tests the case where the cluster name changes in the route resource update // and verify that each time Update is called with correct cluster name. func (s) TestRouteResourceUpdate(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - xdsClient := createXDSClient(t, bc) + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -695,15 +695,100 @@ func (s) TestRouteResourceUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Initial resources with defaultTestClusterName - listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, + // Configure initial resources + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + // Wait for the first update. + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) } + + // Update route to point to a new cluster. + newClusterName := "new-cluster-name" + route2 := e2e.DefaultRouteConfig(resources.Routes[0].Name, defaultTestServiceName, newClusterName) + cluster2 := e2e.DefaultCluster(newClusterName, newClusterName, e2e.SecurityLevelNone) + endpoints2 := e2e.DefaultEndpoint(newClusterName, "localhost", []uint32{8081}) + resources.Routes = []*v3routepb.RouteConfiguration{route2} + resources.Clusters = []*v3clusterpb.Cluster{cluster2} + resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{endpoints2} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the second update and verify it has the new cluster. + wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName + wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName + wantXdsConfig.Clusters = map[string]*xdsresource.ClusterResult{ + newClusterName: { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterName: newClusterName, + ClusterType: xdsresource.ClusterTypeEDS, + EDSServiceName: newClusterName, + }, + EndpointConfig: &xdsresource.EndpointConfig{ + EDSUpdate: &xdsresource.EndpointsUpdate{ + Localities: []xdsresource.Locality{ + {ID: clients.Locality{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "localhost:8081"}}}, + HealthStatus: xdsresource.EndpointHealthStatusUnknown, + Weight: 1, + }, + }, + Weight: 1, + }, + }, + }, + }, + }, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where the route resource is first sent from the management +// server and the changed to be inline with the listener and then again changed +// to be received from the management server. It verifies that each time Update +// is called with the correct XDSConfig. +func (s) TestRouteResourceChangeToInline(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Initial resources with defaultTestClusterName + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -712,10 +797,43 @@ func (s) TestRouteResourceUpdate(t *testing.T) { defer dm.Close() // Wait for the first update. - wantXdsConfig := &xdsresource.XDSConfig{ + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Update route to point to a new cluster and make it inline with the + // listener. + newClusterName := "new-cluster-name" + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName), + }, + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc + }) + resources.Listeners[0].ApiListener.ApiListener = hcm + resources.Clusters = []*v3clusterpb.Cluster{e2e.DefaultCluster(newClusterName, defaultTestEDSServiceName, e2e.SecurityLevelNone)} + resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(defaultTestEDSServiceName, "localhost", []uint32{8081})} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Wait for the second update and verify it has the new cluster. + wantInlineXdsConfig := &xdsresource.XDSConfig{ Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, - HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + InlineRouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, }, RouteConfig: &xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ @@ -723,7 +841,7 @@ func (s) TestRouteResourceUpdate(t *testing.T) { Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{ Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}}, ActionType: xdsresource.RouteActionRoute, }}, }, @@ -733,40 +851,180 @@ func (s) TestRouteResourceUpdate(t *testing.T) { Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{ Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, - ActionType: xdsresource.RouteActionRoute, - }}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }, }, + Clusters: map[string]*xdsresource.ClusterResult{ + newClusterName: { + Config: xdsresource.ClusterConfig{Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: newClusterName, + EDSServiceName: defaultTestEDSServiceName, + }, + EndpointConfig: &xdsresource.EndpointConfig{ + EDSUpdate: &xdsresource.EndpointsUpdate{ + Localities: []xdsresource.Locality{ + {ID: clients.Locality{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "localhost:8081"}}}, + HealthStatus: xdsresource.EndpointHealthStatusUnknown, + Weight: 1, + }, + }, + Weight: 1, + }, + }, + }, + }, + }, + }, + }, + } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantInlineXdsConfig); err != nil { + t.Fatal(err) + } + + // Change the route resource back to non-inline. + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } +} + +// Tests the case where the dependency manager receives a cluster resource error +// and verifies that Update is called with XDSConfig containing cluster error. +func (s) TestClusterResourceError(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + done: make(chan struct{}), + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}} - // Update route to point to a new cluster. - newClusterName := "new-cluster-name" - route2 := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName) - resources.Routes = []*v3routepb.RouteConfiguration{route2} if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Wait for the second update and verify it has the new cluster. - wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName - wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") + wantXdsConfig.Clusters[resources.Clusters[0].Name] = &xdsresource.ClusterResult{Err: fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("unsupported config_source_specifier *corev3.ConfigSource_Ads in lrs_server field"))} + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } + // Close the watcher done channel to stop sending updates because management + // server keeps sending the error updates repeatedly causing the update from + // dependency manager to be blocked. + close(watcher.done) } -// Tests the case where the route resource is first sent from the management -// server and the changed to be inline with the listener and then again changed -// to be received from the management server. It verifies that each time Update -// called with the correct XDSConfig. -func (s) TestRouteResourceChangeToInline(t *testing.T) { - nodeID := uuid.New().String() - mgmtServer, bc := setupManagementServerForTest(t, nodeID) - defer mgmtServer.Stop() - xdsClient := createXDSClient(t, bc) +// Tests the case where the dependency manager receives a cluster resource +// ambient error. A valid cluster resource is sent first, then an invalid +// one and then the valid resource again. The valid resource is sent again +// to make sure that the ambient error reaches the dependency manager since +// there is no other way to wait for it. +func (s) TestClusterAmbientError(t *testing.T) { + // Expect a warning log for the ambient error. + grpctest.ExpectWarning("Cluster resource ambient error") + + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, false) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Clusters[0].EdsClusterConfig.ServiceName, "localhost:8080") + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Configure a cluster resource that is expected to be NACKed because it + // does not contain the `LrsServer` field. Since a valid one is already + // cached, this should result in an ambient error. + resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + select { + case <-time.After(defaultTestShortTimeout): + case update := <-watcher.updateCh: + t.Fatalf("received unexpected update from dependency manager: %v", update) + } + + // Send valid resources again to guarantee we get the cluster ambient error + // before the test ends. + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the case where a cluster is an aggregate cluster whose children are of +// type EDS and DNS. Verifies that Update is not called when one of the child +// resources is not configured and then verifies that Update is called with +// correct config when all resources are configured. +func (s) TestAggregateCluster(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, true) watcher := &testWatcher{ updateCh: make(chan *xdsresource.XDSConfig), @@ -775,14 +1033,17 @@ func (s) TestRouteResourceChangeToInline(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // Initial resources with defaultTestClusterName listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + aggregateCluster := makeAggregateClusterResource(defaultTestClusterName, []string{"eds-cluster", "dns-cluster"}) + edsCluster := e2e.DefaultCluster("eds-cluster", defaultTestEDSServiceName, e2e.SecurityLevelNone) + endpoint := e2e.DefaultEndpoint(defaultTestEDSServiceName, "localhost", []uint32{8080}) resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, - SkipValidation: true, + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{route}, + Clusters: []*v3clusterpb.Cluster{aggregateCluster, edsCluster}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoint}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) @@ -791,10 +1052,26 @@ func (s) TestRouteResourceChangeToInline(t *testing.T) { dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) defer dm.Close() - // Wait for the first update. + // Verify that no configuration is pushed to the child policy yet, because + // not all clusters making up the aggregate cluster have been resolved yet. + select { + case <-time.After(defaultTestShortTimeout): + case update := <-watcher.updateCh: + t.Fatalf("received unexpected update from dependency manager: %+v", update) + case err := <-watcher.errorCh: + t.Fatalf("received unexpected error from dependency manager: %v", err) + } + + // Now configure the LogicalDNS cluster in the management server. This + // should result in configuration being pushed down to the child policy. + resources.Clusters = append(resources.Clusters, makeLogicalDNSClusterResource("dns-cluster", "localhost", 8081)) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantXdsConfig := &xdsresource.XDSConfig{ Listener: &xdsresource.ListenerUpdate{ - RouteConfigName: defaultTestRouteConfigName, + RouteConfigName: resources.Routes[0].Name, HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, }, RouteConfig: &xdsresource.RouteConfigUpdate{ @@ -803,12 +1080,138 @@ func (s) TestRouteResourceChangeToInline(t *testing.T) { Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{ Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + WeightedClusters: []xdsresource.WeightedCluster{{Name: resources.Clusters[0].Name, Weight: 100}}, ActionType: xdsresource.RouteActionRoute, }}, }, }, }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: resources.Clusters[0].Name, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute}, + }}, + Clusters: map[string]*xdsresource.ClusterResult{ + resources.Clusters[0].Name: { + Config: xdsresource.ClusterConfig{Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: resources.Clusters[0].Name, + PrioritizedClusterNames: []string{"eds-cluster", "dns-cluster"}, + }, + AggregateConfig: &xdsresource.AggregateConfig{LeafClusters: []string{"eds-cluster", "dns-cluster"}}, + }, + }, + "eds-cluster": { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "eds-cluster", + EDSServiceName: defaultTestEDSServiceName}, + EndpointConfig: &xdsresource.EndpointConfig{ + EDSUpdate: &xdsresource.EndpointsUpdate{ + Localities: []xdsresource.Locality{ + {ID: clients.Locality{ + Region: "region-1", + Zone: "zone-1", + SubZone: "subzone-1", + }, + Endpoints: []xdsresource.Endpoint{ + { + ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "localhost:8080"}}}, + HealthStatus: xdsresource.EndpointHealthStatusUnknown, + Weight: 1, + }, + }, + Weight: 1, + }, + }, + }, + }, + }}, + "dns-cluster": { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeLogicalDNS, + ClusterName: "dns-cluster", + DNSHostName: "localhost:8081", + }, + EndpointConfig: &xdsresource.EndpointConfig{ + DNSEndpoints: &xdsresource.DNSUpdate{ + Endpoints: []resolver.Endpoint{ + { + Addresses: []resolver.Address{ + {Addr: "[::1]:8081"}, + }, + }, + { + Addresses: []resolver.Address{ + {Addr: "127.0.0.1:8081"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + +} + +// Tests the case where an aggregate cluster has one child whose resource is +// configured with an error. Verifies that the error is correctly received in +// the XDSConfig. +func (s) TestAggregateClusterChildError(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, true) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + done: make(chan struct{}), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(defaultTestClusterName, []string{"err-cluster", "good-cluster"}), + e2e.DefaultCluster("err-cluster", defaultTestEDSServiceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource("good-cluster", "localhost", 8081), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(defaultTestEDSServiceName, "localhost", []uint32{8080})}, + } + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }}, + }, VirtualHost: &xdsresource.VirtualHost{ Domains: []string{defaultTestServiceName}, Routes: []*xdsresource.Route{{ @@ -817,63 +1220,257 @@ func (s) TestRouteResourceChangeToInline(t *testing.T) { ActionType: xdsresource.RouteActionRoute, }}, }, + Clusters: map[string]*xdsresource.ClusterResult{ + defaultTestClusterName: { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: defaultTestClusterName, + PrioritizedClusterNames: []string{"err-cluster", "good-cluster"}, + }, + AggregateConfig: &xdsresource.AggregateConfig{LeafClusters: []string{"err-cluster", "good-cluster"}}, + }, + }, + "err-cluster": { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "err-cluster", + EDSServiceName: defaultTestEDSServiceName, + }, + EndpointConfig: &xdsresource.EndpointConfig{ + ResolutionNote: fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("EDS response contains an endpoint with zero weight: endpoint:{address:{socket_address:{address:%q port_value:%v}}} load_balancing_weight:{}", "localhost", 8080)), + }, + }, + }, + "good-cluster": { + Config: xdsresource.ClusterConfig{ + Cluster: &xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeLogicalDNS, + ClusterName: "good-cluster", + DNSHostName: "localhost:8081", + }, + EndpointConfig: &xdsresource.EndpointConfig{ + DNSEndpoints: &xdsresource.DNSUpdate{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "[::1]:8081"}}}, + {Addresses: []resolver.Address{{Addr: "127.0.0.1:8081"}}}, + }, + }, + }, + }, + }, + }, } + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } - // Update route to point to a new cluster. - newClusterName := "new-cluster-name" - hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ - RouteConfig: e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName), + // Close the watcher done channel to stop sending updates because management + // server keeps sending the error updates repeatedly causing the update from + // dependency manager to be blocked. + close(watcher.done) +} + +// Tests the case where an aggregate cluster has no leaf clusters by creating a +// cyclic dependency where A->B and B->A. Verifies that an error with "no leaf +// clusters found" is received. +func (s) TestAggregateClusterNoLeafCluster(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, true) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + const ( + clusterNameA = defaultTestClusterName // cluster name in cds LB policy config + clusterNameB = defaultTestClusterName + "-B" + ) + // Create a cyclic dependency where A->B and B->A. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterNameA, []string{clusterNameB}), + makeAggregateClusterResource(clusterNameB, []string{clusterNameA}), }, - HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc - }) - resources.Listeners[0].ApiListener.ApiListener = hcm - resources.Routes = nil + } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Wait for the second update and verify it has the new cluster. - wantXdsConfig.Listener.InlineRouteConfig = &xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{{ Domains: []string{defaultTestServiceName}, - Routes: []*xdsresource.Route{{Prefix: newStringP("/"), - WeightedClusters: []xdsresource.WeightedCluster{{Name: newClusterName, Weight: 100}}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, ActionType: xdsresource.RouteActionRoute, }}, + }}, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + Clusters: map[string]*xdsresource.ClusterResult{ + defaultTestClusterName: { + Err: fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("aggregate cluster graph has no leaf clusters")), + }, + clusterNameB: { + Err: fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("aggregate cluster graph has no leaf clusters")), }, }, } - wantXdsConfig.Listener.RouteConfigName = "" - wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = newClusterName - wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = newClusterName + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } +} - // Change the route resource back to non-inline. - listener = e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) - route = e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) - resources = e2e.UpdateOptions{ +// Tests the case where nested aggregate clusters exceed the max depth of 16. +// Verify that the error is correctly received in the XDSConfig in all the +// clusters. +func (s) TestAggregateClusterMaxDepth(t *testing.T) { + const clusterDepth = 17 + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, true) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create a graph of aggregate clusters with 18 clusters. + clusters := make([]*v3clusterpb.Cluster, clusterDepth) + for i := 0; i < clusterDepth; i++ { + clusters[i] = makeAggregateClusterResource(fmt.Sprintf("agg-%d", i), []string{fmt.Sprintf("agg-%d", i+1)}) + } + + resources := e2e.UpdateOptions{ NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, "agg-0")}, + Clusters: clusters, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } - // Wait for the third update and verify it has the original cluster. - wantXdsConfig.Listener.InlineRouteConfig = nil - wantXdsConfig.Listener.RouteConfigName = defaultTestRouteConfigName - wantXdsConfig.RouteConfig.VirtualHosts[0].Routes[0].WeightedClusters[0].Name = defaultTestClusterName - wantXdsConfig.VirtualHost.Routes[0].WeightedClusters[0].Name = defaultTestClusterName + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + + commonError := fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("aggregate cluster graph exceeds max depth (%d)", 16)) + + wantXdsConfig := &xdsresource.XDSConfig{ + Listener: &xdsresource.ListenerUpdate{ + RouteConfigName: defaultTestRouteConfigName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + RouteConfig: &xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + // The route should point to the first cluster in the chain: + // agg-0 + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: "agg-0", Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }}, + }, + VirtualHost: &xdsresource.VirtualHost{ + Domains: []string{defaultTestServiceName}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("/"), + WeightedClusters: []xdsresource.WeightedCluster{{Name: "agg-0", Weight: 100}}, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + Clusters: map[string]*xdsresource.ClusterResult{}, // Initialize the map + } + + // Populate the Clusters map with all clusters,except the last one, each + // having the common error + for i := 0; i < clusterDepth; i++ { + clusterName := fmt.Sprintf("agg-%d", i) + + // The ClusterResult only needs the Err field set to the common error + wantXdsConfig.Clusters[clusterName] = &xdsresource.ClusterResult{ + Err: commonError, + } + } + + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } +} + +// Tests the scenrio where the Endpoint watcher receives an ambient error. Tests +// verifies that the error is stored in resolution note and the update remains +// too. +func (s) TestEndpointAmbientError(t *testing.T) { + nodeID, mgmtServer, xdsClient := setupManagementServerAndClient(t, true) + + watcher := &testWatcher{ + updateCh: make(chan *xdsresource.XDSConfig), + errorCh: make(chan error), + done: make(chan struct{}), + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + NodeID: nodeID, + DialTarget: defaultTestServiceName, + Host: "localhost", + Port: 8080, + SecLevel: e2e.SecurityLevelNone, + }) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + dm := xdsdepmgr.New(defaultTestServiceName, defaultTestServiceName, xdsClient, watcher) + defer dm.Close() + wantXdsConfig := makeXDSConfig(resources.Routes[0].Name, resources.Clusters[0].Name, resources.Endpoints[0].ClusterName, "localhost:8080") + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { t.Fatal(err) } + + // Send an ambient error for the endpoint resource by setting the weight to + // 0. + resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantXdsConfig.Clusters[resources.Clusters[0].Name].Config.EndpointConfig.ResolutionNote = fmt.Errorf("[xDS node id: %v]: %v", nodeID, fmt.Errorf("EDS response contains an endpoint with zero weight: endpoint:{address:{socket_address:{address:%q port_value:%v}}} load_balancing_weight:{}", "localhost", 8080)) + if err := verifyXDSConfig(ctx, watcher.updateCh, watcher.errorCh, wantXdsConfig); err != nil { + t.Fatal(err) + } + + // Close the watcher done channel to stop sending updates because management + // server keeps sending the error updates repeatedly causing the update from + // dependency manager to be blocked. + close(watcher.done) }