Skip to content

Commit a6618d6

Browse files
committed
pass matched route to LB picker
1 parent 4c27cc8 commit a6618d6

File tree

4 files changed

+212
-34
lines changed

4 files changed

+212
-34
lines changed

internal/xds/resolver/serviceconfig.go

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ import (
2626
rand "math/rand/v2"
2727
"strings"
2828
"sync/atomic"
29-
"time"
3029

3130
xxhash "github.com/cespare/xxhash/v2"
3231
"google.golang.org/grpc/codes"
3332
"google.golang.org/grpc/internal/grpcutil"
3433
iresolver "google.golang.org/grpc/internal/resolver"
3534
iringhash "google.golang.org/grpc/internal/ringhash"
3635
"google.golang.org/grpc/internal/serviceconfig"
37-
"google.golang.org/grpc/internal/wrr"
3836
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
3937
"google.golang.org/grpc/internal/xds/httpfilter"
4038
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
@@ -107,19 +105,6 @@ type routeCluster struct {
107105
interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route.
108106
}
109107

110-
type route struct {
111-
m *xdsresource.CompositeMatcher // converted from route matchers
112-
actionType xdsresource.RouteActionType // holds route action type
113-
clusters wrr.WRR // holds *routeCluster entries
114-
maxStreamDuration time.Duration
115-
retryConfig *xdsresource.RetryConfig
116-
hashPolicies []*xdsresource.HashPolicy
117-
}
118-
119-
func (r route) String() string {
120-
return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.m.String(), r.clusters, r.maxStreamDuration)
121-
}
122-
123108
// stoppableConfigSelector extends the iresolver.ConfigSelector interface with a
124109
// stop() method. This makes it possible to swap the current config selector
125110
// with an erroring config selector when the LDS or RDS resource is not found on
@@ -152,7 +137,7 @@ type configSelector struct {
152137

153138
// Configuration received from the xDS management server.
154139
virtualHost virtualHost
155-
routes []route
140+
routes []xdsresource.MatchedRoute
156141
clusters map[string]*clusterInfo
157142
httpFilterConfig []xdsresource.HTTPFilter
158143
}
@@ -168,24 +153,24 @@ func annotateErrorWithNodeID(err error, nodeID string) error {
168153
}
169154

170155
func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
171-
var rt *route
156+
var rt *xdsresource.MatchedRoute
172157
// Loop through routes in order and select first match.
173158
for _, r := range cs.routes {
174-
if r.m.Match(rpcInfo) {
159+
if r.M.Match(rpcInfo) {
175160
rt = &r
176161
break
177162
}
178163
}
179164

180-
if rt == nil || rt.clusters == nil {
165+
if rt == nil || rt.Clusters == nil {
181166
return nil, annotateErrorWithNodeID(errNoMatchedRouteFound, cs.xdsNodeID)
182167
}
183168

184-
if rt.actionType != xdsresource.RouteActionRoute {
169+
if rt.ActionType != xdsresource.RouteActionRoute {
185170
return nil, annotateErrorWithNodeID(errUnsupportedClientRouteAction, cs.xdsNodeID)
186171
}
187172

188-
cluster, ok := rt.clusters.Next().(*routeCluster)
173+
cluster, ok := rt.Clusters.Next().(*routeCluster)
189174
if !ok {
190175
return nil, annotateErrorWithNodeID(status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster), cs.xdsNodeID)
191176
}
@@ -196,7 +181,8 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
196181
atomic.AddInt32(ref, 1)
197182

198183
lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
199-
lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))
184+
lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.HashPolicies))
185+
lbCtx = xdsresource.SetMatchedRoute(lbCtx, *rt)
200186

201187
config := &iresolver.RPCConfig{
202188
// Communicate to the LB policy the chosen cluster and request hash, if Ring Hash LB policy.
@@ -213,11 +199,11 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
213199
Interceptor: cluster.interceptor,
214200
}
215201

216-
if rt.maxStreamDuration != 0 {
217-
config.MethodConfig.Timeout = &rt.maxStreamDuration
202+
if rt.MaxStreamDuration != 0 {
203+
config.MethodConfig.Timeout = &rt.MaxStreamDuration
218204
}
219-
if rt.retryConfig != nil {
220-
config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
205+
if rt.RetryConfig != nil {
206+
config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.RetryConfig)
221207
} else if cs.virtualHost.retryConfig != nil {
222208
config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
223209
}

internal/xds/resolver/xds_resolver.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
351351
virtualHost: virtualHost{
352352
retryConfig: r.xdsConfig.VirtualHost.RetryConfig,
353353
},
354-
routes: make([]route, len(r.xdsConfig.VirtualHost.Routes)),
354+
routes: make([]xdsresource.MatchedRoute, len(r.xdsConfig.VirtualHost.Routes)),
355355
clusters: make(map[string]*clusterInfo),
356356
httpFilterConfig: r.xdsConfig.Listener.HTTPFilters,
357357
}
@@ -380,18 +380,20 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
380380
cs.clusters[clusterName] = ci
381381
}
382382
}
383-
cs.routes[i].clusters = clusters
383+
cs.routes[i].Clusters = clusters
384384

385-
cs.routes[i].m = xdsresource.RouteToMatcher(rt)
386-
cs.routes[i].actionType = rt.ActionType
385+
cs.routes[i].M = xdsresource.RouteToMatcher(rt)
386+
cs.routes[i].ActionType = rt.ActionType
387387
if rt.MaxStreamDuration == nil {
388-
cs.routes[i].maxStreamDuration = r.xdsConfig.Listener.MaxStreamDuration
388+
cs.routes[i].MaxStreamDuration = r.xdsConfig.Listener.MaxStreamDuration
389389
} else {
390-
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
390+
cs.routes[i].MaxStreamDuration = *rt.MaxStreamDuration
391391
}
392392

393-
cs.routes[i].retryConfig = rt.RetryConfig
394-
cs.routes[i].hashPolicies = rt.HashPolicies
393+
cs.routes[i].HTTPFilterConfigOverride = rt.HTTPFilterConfigOverride
394+
cs.routes[i].RetryConfig = rt.RetryConfig
395+
cs.routes[i].HashPolicies = rt.HashPolicies
396+
cs.routes[i].AutoHostRewrite = rt.AutoHostRewrite
395397
}
396398

397399
// Account for this config selector's clusters. Do this after no further

internal/xds/resolver/xds_resolver_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,17 @@ import (
3535
"google.golang.org/grpc/codes"
3636
estats "google.golang.org/grpc/experimental/stats"
3737
"google.golang.org/grpc/internal"
38+
"google.golang.org/grpc/internal/envconfig"
3839
iresolver "google.golang.org/grpc/internal/resolver"
3940
iringhash "google.golang.org/grpc/internal/ringhash"
4041
"google.golang.org/grpc/internal/testutils"
4142
"google.golang.org/grpc/internal/testutils/xds/e2e"
4243
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
4344
"google.golang.org/grpc/internal/xds/bootstrap"
45+
serverFeature "google.golang.org/grpc/internal/xds/clients/xdsclient"
4446
rinternal "google.golang.org/grpc/internal/xds/resolver/internal"
4547
"google.golang.org/grpc/internal/xds/xdsclient"
48+
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
4649
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
4750
"google.golang.org/grpc/metadata"
4851
"google.golang.org/grpc/resolver"
@@ -1297,3 +1300,123 @@ func (s) TestConfigSelector_FailureCases(t *testing.T) {
12971300
func newDurationP(d time.Duration) *time.Duration {
12981301
return &d
12991302
}
1303+
1304+
// TestResolver_AutoHostRewrite verifies the propagation of the AutoHostRewrite
1305+
// field from the xDS RouteConfiguration to the internal MatchedRoute.
1306+
//
1307+
// Per gRFC A81, this feature should only be active if three conditions are met:
1308+
// 1. The environment variable (XDSAuthorityRewrite) is enabled.
1309+
// 2. The xDS server is marked as a "trusted_xds_server" in the bootstrap config.
1310+
// 3. The specific Route Configuration requests `auto_host_rewrite=true`.
1311+
func TestResolver_AutoHostRewrite(t *testing.T) {
1312+
for _, tt := range []struct {
1313+
name string
1314+
autoHostRewrite bool
1315+
envconfig bool
1316+
serverfeature serverFeature.ServerFeature
1317+
wantAutoHostRewrite bool
1318+
}{
1319+
{
1320+
name: "AutoHostRewrite enabled",
1321+
autoHostRewrite: true,
1322+
envconfig: true,
1323+
serverfeature: serverFeature.ServerFeatureTrustedXDSServer,
1324+
wantAutoHostRewrite: true,
1325+
},
1326+
{
1327+
name: "Disable_EnvVarOff",
1328+
autoHostRewrite: true,
1329+
envconfig: false,
1330+
serverfeature: serverFeature.ServerFeatureTrustedXDSServer,
1331+
wantAutoHostRewrite: false,
1332+
},
1333+
{
1334+
name: "Disable_UntrustedServer",
1335+
autoHostRewrite: true,
1336+
envconfig: true,
1337+
wantAutoHostRewrite: false,
1338+
},
1339+
{
1340+
name: "Route config with AutoHostRewrite disabled",
1341+
autoHostRewrite: false,
1342+
envconfig: true,
1343+
serverfeature: serverFeature.ServerFeatureTrustedXDSServer,
1344+
wantAutoHostRewrite: false,
1345+
},
1346+
} {
1347+
t.Run(tt.name, func(t *testing.T) {
1348+
testutils.SetEnvConfig(t, &envconfig.XDSAuthorityRewrite, tt.envconfig)
1349+
1350+
// Spin up an xDS management server for the test.
1351+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1352+
defer cancel()
1353+
nodeID := uuid.New().String()
1354+
mgmtServer, _, _, _ := setupManagementServerForTest(t, nodeID)
1355+
1356+
// Configure the management server with a good listener resource and a
1357+
// route configuration resource, as specified by the test case.
1358+
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
1359+
clusters := []*v3clusterpb.Cluster{e2e.DefaultCluster(defaultTestClusterName, defaultTestEndpointName, e2e.SecurityLevelNone)}
1360+
endpoints := []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(defaultTestEndpointName, defaultTestHostname, defaultTestPort)}
1361+
routes := []*v3routepb.RouteConfiguration{{
1362+
Name: defaultTestRouteConfigName,
1363+
VirtualHosts: []*v3routepb.VirtualHost{{
1364+
Domains: []string{defaultTestServiceName},
1365+
Routes: []*v3routepb.Route{{
1366+
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
1367+
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
1368+
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
1369+
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
1370+
{
1371+
Name: defaultTestClusterName,
1372+
Weight: &wrapperspb.UInt32Value{Value: 100},
1373+
},
1374+
},
1375+
}},
1376+
HostRewriteSpecifier: &v3routepb.RouteAction_AutoHostRewrite{
1377+
AutoHostRewrite: &wrapperspb.BoolValue{Value: tt.autoHostRewrite},
1378+
},
1379+
}},
1380+
}},
1381+
}},
1382+
}}
1383+
configureAllResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes, clusters, endpoints)
1384+
1385+
trustedXdsServer := "[]"
1386+
if tt.serverfeature == serverFeature.ServerFeatureTrustedXDSServer {
1387+
trustedXdsServer = `["trusted_xds_server"]`
1388+
}
1389+
1390+
opts := bootstrap.ConfigOptionsForTesting{
1391+
Servers: []byte(fmt.Sprintf(`[{
1392+
"server_uri": %q,
1393+
"channel_creds": [{"type": "insecure"}],
1394+
"server_features": %s
1395+
}]`, mgmtServer.Address, trustedXdsServer)),
1396+
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
1397+
}
1398+
1399+
contents, err := bootstrap.NewContentsForTesting(opts)
1400+
if err != nil {
1401+
t.Fatalf("Failed to create bootstrap configuration: %v", err)
1402+
}
1403+
1404+
// Build the resolver and read the config selector out of it.
1405+
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, contents)
1406+
cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
1407+
1408+
res, err := cs.SelectConfig(iresolver.RPCInfo{
1409+
Context: ctx,
1410+
Method: "/service/method",
1411+
})
1412+
if err != nil {
1413+
t.Fatalf("cs.SelectConfig(): %v", err)
1414+
}
1415+
1416+
gotRoute := xdsresource.GetMatchedRouteForTesting(res.Context)
1417+
if gotRoute.AutoHostRewrite != tt.wantAutoHostRewrite {
1418+
t.Fatalf("Got autoHostRewrite: %v, want: %v", gotRoute.AutoHostRewrite, tt.wantAutoHostRewrite)
1419+
}
1420+
})
1421+
}
1422+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
*
3+
* Copyright 2022 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package xdsresource
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"time"
24+
25+
"google.golang.org/grpc/internal/wrr"
26+
"google.golang.org/grpc/internal/xds/httpfilter"
27+
)
28+
29+
// MatchedRoute holds configuration for a single route selected by the xDS
30+
// resolver's ConfigSelector.
31+
type MatchedRoute struct {
32+
M *CompositeMatcher // converted from route matchers
33+
ActionType RouteActionType // holds route action type
34+
Clusters wrr.WRR // holds *routeCluster entries
35+
MaxStreamDuration time.Duration
36+
// map from filter name to its config
37+
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
38+
RetryConfig *RetryConfig
39+
HashPolicies []*HashPolicy
40+
AutoHostRewrite bool
41+
}
42+
43+
func (r MatchedRoute) String() string {
44+
return fmt.Sprintf("%s -> { clusters: %v, maxStreamDuration: %v }", r.M.String(), r.Clusters, r.MaxStreamDuration)
45+
}
46+
47+
// XdsRouteAttributeKey is the context key used to store the MatchedRoute
48+
// in the RPC context.
49+
type XdsRouteAttributeKey struct{}
50+
51+
// GetMatchedRoute retrieves the MatchedRoute from the provided context.
52+
func GetMatchedRoute(ctx context.Context) MatchedRoute {
53+
route, _ := ctx.Value(XdsRouteAttributeKey{}).(MatchedRoute)
54+
return route
55+
}
56+
57+
// GetMatchedRouteForTesting returns the matched route in the context; to be
58+
// used for testing only.
59+
func GetMatchedRouteForTesting(ctx context.Context) MatchedRoute {
60+
return GetMatchedRoute(ctx)
61+
}
62+
63+
// SetMatchedRoute adds the mathced route to the context for the
64+
// xds_cluster_impl LB policy to pick.
65+
func SetMatchedRoute(ctx context.Context, route MatchedRoute) context.Context {
66+
return context.WithValue(ctx, XdsRouteAttributeKey{}, route)
67+
}

0 commit comments

Comments
 (0)