diff --git a/internal/resolver/delegatingresolver/delegatingresolver.go b/internal/resolver/delegatingresolver/delegatingresolver.go index 7b93f692be01..c0e22757727a 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver.go +++ b/internal/resolver/delegatingresolver/delegatingresolver.go @@ -28,6 +28,8 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/proxyattributes" + "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -40,7 +42,7 @@ var ( // delegatingResolver manages both target URI and proxy address resolution by // delegating these tasks to separate child resolvers. Essentially, it acts as -// a intermediary between the gRPC ClientConn and the child resolvers. +// an intermediary between the gRPC ClientConn and the child resolvers. // // It implements the [resolver.Resolver] interface. type delegatingResolver struct { @@ -48,6 +50,9 @@ type delegatingResolver struct { cc resolver.ClientConn // gRPC ClientConn proxyURL *url.URL // proxy URL, derived from proxy environment and target + // We do not hold both mu and childMu in the same goroutine. Avoid holding + // both locks when calling into the child, as the child resolver may + // synchronously callback into the channel. mu sync.Mutex // protects all the fields below targetResolverState *resolver.State // state of the target resolver proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured @@ -66,8 +71,8 @@ func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {} func (nopResolver) Close() {} -// proxyURLForTarget determines the proxy URL for the given address based on -// the environment. It can return the following: +// proxyURLForTarget determines the proxy URL for the given address based on the +// environment. It can return the following: // - nil URL, nil error: No proxy is configured or the address is excluded // using the `NO_PROXY` environment variable or if req.URL.Host is // "localhost" (with or without // a port number) @@ -86,7 +91,8 @@ func proxyURLForTarget(address string) (*url.URL, error) { // resolvers: // - one to resolve the proxy address specified using the supported // environment variables. This uses the registered resolver for the "dns" -// scheme. +// scheme. It is lazily built when a target resolver update contains at least +// one TCP address. // - one to resolve the target URI using the resolver specified by the scheme // in the target URI or specified by the user using the WithResolvers dial // option. As a special case, if the target URI's scheme is "dns" and a @@ -95,8 +101,10 @@ func proxyURLForTarget(address string) (*url.URL, error) { // resolution is enabled using the dial option. func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) { r := &delegatingResolver{ - target: target, - cc: cc, + target: target, + cc: cc, + proxyResolver: nopResolver{}, + targetResolver: nopResolver{}, } var err error @@ -123,37 +131,26 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti // resolution should be handled by the proxy, not the client. Therefore, we // bypass the target resolver and store the unresolved target address. if target.URL.Scheme == "dns" && !targetResolutionEnabled { - state := resolver.State{ + r.targetResolverState = &resolver.State{ Addresses: []resolver.Address{{Addr: target.Endpoint()}}, Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}}, } - r.targetResolverState = &state - } else { - wcc := &wrappingClientConn{ - stateListener: r.updateTargetResolverState, - parent: r, - } - if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil { - return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err) - } - } - - if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil { - return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err) + r.updateTargetResolverState(*r.targetResolverState) + return r, nil } - - if r.targetResolver == nil { - r.targetResolver = nopResolver{} + wcc := &wrappingClientConn{ + stateListener: r.updateTargetResolverState, + parent: r, } - if r.proxyResolver == nil { - r.proxyResolver = nopResolver{} + if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil { + return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err) } return r, nil } -// proxyURIResolver creates a resolver for resolving proxy URIs using the -// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and -// builds a resolver with a wrappingClientConn to capture resolved addresses. +// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns" +// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds +// a resolver with a wrappingClientConn to capture resolved addresses. func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) { proxyBuilder := resolver.Get("dns") if proxyBuilder == nil { @@ -189,18 +186,43 @@ func (r *delegatingResolver) Close() { r.proxyResolver = nil } -// updateClientConnStateLocked creates a list of combined addresses by -// pairing each proxy address with every target address. For each pair, it -// generates a new [resolver.Address] using the proxy address, and adding the -// target address as the attribute along with user info. It returns nil if -// either resolver has not sent update even once and returns the error from -// ClientConn update once both resolvers have sent update atleast once. +func networkTypeFromAddr(addr resolver.Address) string { + networkType, ok := networktype.Get(addr) + if !ok { + networkType, _ = transport.ParseDialTarget(addr.Addr) + } + return networkType +} + +func isTCPAddressPresent(state *resolver.State) bool { + for _, addr := range state.Addresses { + if networkType := networkTypeFromAddr(addr); networkType == "tcp" { + return true + } + } + for _, endpoint := range state.Endpoints { + for _, addr := range endpoint.Addresses { + if networktype := networkTypeFromAddr(addr); networktype == "tcp" { + return true + } + } + } + return false +} + +// updateClientConnStateLocked constructs a combined list of addresses by +// pairing each proxy address with every target address of type TCP. For each +// pair, it creates a new [resolver.Address] using the proxy address and +// attaches the corresponding target address and user info as attributes. Target +// addresses that are not of type TCP are appended to the list as-is. The +// function returns nil if either resolver has not yet provided an update, and +// returns the result of ClientConn.UpdateState once both resolvers have +// provided at least one update. func (r *delegatingResolver) updateClientConnStateLocked() error { if r.targetResolverState == nil || r.proxyAddrs == nil { return nil } - curState := *r.targetResolverState // If multiple resolved proxy addresses are present, we send only the // unresolved proxy host and let net.Dial handle the proxy host name // resolution when creating the transport. Sending all resolved addresses @@ -218,24 +240,30 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { } var addresses []resolver.Address for _, targetAddr := range (*r.targetResolverState).Addresses { + // Avoid proxy when network is not tcp. + if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" { + addresses = append(addresses, targetAddr) + continue + } addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{ User: r.proxyURL.User, ConnectAddr: targetAddr.Addr, })) } - // Create a list of combined endpoints by pairing all proxy endpoints - // with every target endpoint. Each time, it constructs a new - // [resolver.Endpoint] using the all addresses from all the proxy endpoint - // and the target addresses from one endpoint. The target address and user - // information from the proxy URL are added as attributes to the proxy - // address.The resulting list of addresses is then grouped into endpoints, - // covering all combinations of proxy and target endpoints. + // For each target endpoint, construct a new [resolver.Endpoint] that + // includes all addresses from all proxy endpoints and the addresses from + // that target endpoint, preserving the number of target endpoints. var endpoints []resolver.Endpoint for _, endpt := range (*r.targetResolverState).Endpoints { var addrs []resolver.Address - for _, proxyAddr := range r.proxyAddrs { - for _, targetAddr := range endpt.Addresses { + for _, targetAddr := range endpt.Addresses { + // Avoid proxy when network is not tcp. + if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" { + addrs = append(addrs, targetAddr) + continue + } + for _, proxyAddr := range r.proxyAddrs { addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{ User: r.proxyURL.User, ConnectAddr: targetAddr.Addr, @@ -246,8 +274,9 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { } // Use the targetResolverState for its service config and attributes // contents. The state update is only sent after both the target and proxy - // resolvers have sent their updates, and curState has been updated with - // the combined addresses. + // resolvers have sent their updates, and curState has been updated with the + // combined addresses. + curState := *r.targetResolverState curState.Addresses = addresses curState.Endpoints = endpoints return r.cc.UpdateState(curState) @@ -257,7 +286,8 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { // addresses and endpoints, marking the resolver as ready, and triggering a // state update if both proxy and target resolvers are ready. If the ClientConn // returns a non-nil error, it calls `ResolveNow()` on the target resolver. It -// is a StateListener function of wrappingClientConn passed to the proxy resolver. +// is a StateListener function of wrappingClientConn passed to the proxy +// resolver. func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error { r.mu.Lock() defer r.mu.Unlock() @@ -265,8 +295,8 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro logger.Infof("Addresses received from proxy resolver: %s", state.Addresses) } if len(state.Endpoints) > 0 { - // We expect exactly one address per endpoint because the proxy - // resolver uses "dns" resolution. + // We expect exactly one address per endpoint because the proxy resolver + // uses "dns" resolution. r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints)) for _, endpoint := range state.Endpoints { r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...) @@ -294,11 +324,14 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro return err } -// updateTargetResolverState updates the target resolver state by storing target -// addresses, endpoints, and service config, marking the resolver as ready, and -// triggering a state update if both resolvers are ready. If the ClientConn -// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It -// is a StateListener function of wrappingClientConn passed to the target resolver. +// updateTargetResolverState is the StateListener function provided to the +// target resolver via wrappingClientConn. It updates the resolver state and +// marks the target resolver as ready. If the update includes at least one TCP +// address and the proxy resolver has not yet been constructed, it initializes +// the proxy resolver. A combined state update is triggered once both resolvers +// are ready. If all addresses are non-TCP, it proceeds without waiting for the +// proxy resolver. If ClientConn.UpdateState returns a non-nil error, +// ResolveNow() is called on the proxy resolver. func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error { r.mu.Lock() defer r.mu.Unlock() @@ -307,6 +340,31 @@ func (r *delegatingResolver) updateTargetResolverState(state resolver.State) err logger.Infof("Addresses received from target resolver: %v", state.Addresses) } r.targetResolverState = &state + // If no addresses returned by resolver have network type as tcp , do not + // wait for proxy update. + if !isTCPAddressPresent(r.targetResolverState) { + return r.cc.UpdateState(*r.targetResolverState) + } + + // The proxy resolver may be rebuilt multiple times, specifically each time + // the target resolver sends an update, even if the target resolver is built + // successfully but building the proxy resolver fails. + if len(r.proxyAddrs) == 0 { + go func() { + r.childMu.Lock() + defer r.childMu.Unlock() + if _, ok := r.proxyResolver.(nopResolver); !ok { + return + } + proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{}) + if err != nil { + r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err)) + return + } + r.proxyResolver = proxyResolver + }() + } + err := r.updateClientConnStateLocked() if err != nil { go func() { @@ -335,7 +393,8 @@ func (wcc *wrappingClientConn) UpdateState(state resolver.State) error { return wcc.stateListener(state) } -// ReportError intercepts errors from the child resolvers and passes them to ClientConn. +// ReportError intercepts errors from the child resolvers and passes them to +// ClientConn. func (wcc *wrappingClientConn) ReportError(err error) { wcc.parent.cc.ReportError(err) } @@ -346,8 +405,8 @@ func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) { wcc.UpdateState(resolver.State{Addresses: addrs}) } -// ParseServiceConfig parses the provided service config and returns an -// object that provides the parsed config. +// ParseServiceConfig parses the provided service config and returns an object +// that provides the parsed config. func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult { return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON) } diff --git a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go index 18ae17ef4ae7..48a62d4088e3 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go +++ b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/internal/proxyattributes" "google.golang.org/grpc/internal/resolver/delegatingresolver" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" @@ -130,15 +131,29 @@ func (s) TestDelegatingResolverNoProxyEnvVarsSet(t *testing.T) { // overwriting the previously registered DNS resolver. This allows the test to // mock the DNS resolution for the proxy resolver. It also registers the // original DNS resolver after the test is done. -func setupDNS(t *testing.T) *manual.Resolver { +func setupDNS(t *testing.T) (*manual.Resolver, chan struct{}) { t.Helper() mr := manual.NewBuilderWithScheme("dns") dnsResolverBuilder := resolver.Get("dns") resolver.Register(mr) + resolverBuilt := make(chan struct{}) + mr.BuildCallback = func(resolver.Target, resolver.ClientConn, resolver.BuildOptions) { + close(resolverBuilt) + } + t.Cleanup(func() { resolver.Register(dnsResolverBuilder) }) - return mr + return mr, resolverBuilt +} + +func mustBuildResolver(ctx context.Context, t *testing.T, buildCh chan struct{}) { + t.Helper() + select { + case <-buildCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for resolver to be built.") + } } // proxyAddressWithTargetAttribute creates a resolver.Address for the proxy, @@ -180,15 +195,18 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) targetResolver := manual.NewBuilderWithScheme("dns") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, true); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, + }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -198,11 +216,13 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, - }, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -217,8 +237,8 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timeed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -256,13 +276,18 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithNoTargetResolution(t *testing. targetResolver := manual.NewBuilderWithScheme("dns") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) proxyResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ {Addr: resolvedProxyTestAddr1}, @@ -277,8 +302,8 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithNoTargetResolution(t *testing. var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -318,15 +343,18 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, + }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -336,11 +364,13 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, - }, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -354,8 +384,8 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -400,26 +430,13 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Endpoints: []resolver.Endpoint{ - {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}}, - {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr2}}}, - }, - ServiceConfig: &serviceconfig.ParseResult{}, - }) - - select { - case <-stateCh: - t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") - case <-time.After(defaultTestShortTimeout): - } targetResolver.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{ { @@ -435,22 +452,39 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { }, ServiceConfig: &serviceconfig.ParseResult{}, }) + select { + case <-stateCh: + t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") + case <-time.After(defaultTestShortTimeout): + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}}, + {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr2}}}, + }, + ServiceConfig: &serviceconfig.ParseResult{}, + }) wantState := resolver.State{ Endpoints: []resolver.Endpoint{ { Addresses: []resolver.Address{ proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr1), - proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr1), + proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr2), }, }, { Addresses: []resolver.Address{ proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr3), - proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr3), + proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr4), }, }, @@ -460,8 +494,8 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Contex timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -474,7 +508,7 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { // The test verifies that the delegating resolver combines unresolved proxy // host and target addresses correctly, returning addresses with the proxy host // populated and the target address included as an attribute. -func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { +func (s) TestDelegatingResolverForMultipleProxyAddress(t *testing.T) { const ( targetTestAddr = "test.com" resolvedTargetTestAddr1 = "1.1.1.1:8080" @@ -503,17 +537,16 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) - + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ + targetResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ - {Addr: resolvedProxyTestAddr1}, - {Addr: resolvedProxyTestAddr2}, + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -524,10 +557,15 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, + {Addr: resolvedProxyTestAddr1}, + {Addr: resolvedProxyTestAddr2}, }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -542,8 +580,8 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -586,9 +624,9 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) - unblockProxyResolverClose := make(chan struct{}) + unblockProxyResolverClose := make(chan struct{}, 1) proxyResolver.CloseCallback = func() { <-unblockProxyResolverClose t.Log("Proxy resolver is closed.") @@ -607,11 +645,13 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) + // Wait for the proxy resolver to be built before calling Close. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mustBuildResolver(ctx, t, proxyResolverBuilt) // Closing the delegating resolver will block until the test writes to the // unblockProxyResolverClose channel. go dr.Close() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() select { case <-targetResolverCloseCalled: case <-ctx.Done(): @@ -667,7 +707,7 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, _, _ := createTestResolverClientConn(t) tcc.UpdateStateF = func(resolver.State) error { @@ -682,14 +722,18 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + // Updating the channel will result in an error being returned. The // delegating resolver should call call "ResolveNow" on the target resolver. proxyResolver.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() select { case <-targetResolverCalled: case <-ctx.Done(): @@ -716,18 +760,19 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") - targetResolverCalled := make(chan struct{}) + targetResolverCalled := make(chan struct{}, 1) targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. targetResolver.CC.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) - close(targetResolverCalled) + targetResolverCalled <- struct{}{} } target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) + proxyResolverCalled := make(chan struct{}) proxyResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. @@ -743,12 +788,23 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { t.Fatalf("Failed to create delegating resolver: %v", err) } - // Call ResolveNow on the delegatingResolver and verify both children - // receive the ResolveNow call. + // ResolveNow of manual proxy resolver will not be called. Proxy resolver is + // only built when we get the first update from target resolver. Therefore + // in the first ResolveNow, proxy resolver will be a no-op resolver and only + // target resolver's ResolveNow will be called. dr.ResolveNow(resolver.ResolveNowOptions{}) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + select { + case <-targetResolverCalled: + case <-ctx.Done(): + t.Fatalf("context timed out waiting for targetResolver.ResolveNow() to be called.") + } + + mustBuildResolver(ctx, t, proxyResolverBuilt) + + dr.ResolveNow(resolver.ResolveNowOptions{}) + select { case <-targetResolverCalled: case <-ctx.Done(): @@ -760,3 +816,155 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { t.Fatalf("context timed out waiting for proxyResolver.ResolveNow() to be called.") } } + +// Tests the scenario where a proxy is configured, and the resolver returns a +// network type other than tcp for all addresses. The test verifies that the +// delegating resolver avoids the proxy build and directly sends the update +// from target resolver to clientconn. +func (s) TestDelegatingResolverForNonTCPTarget(t *testing.T) { + const ( + targetTestAddr = "test.target" + resolvedTargetTestAddr1 = "1.1.1.1:8080" + envProxyAddr = "proxytest.com" + ) + hpfe := func(req *http.Request) (*url.URL, error) { + if req.URL.Host == targetTestAddr { + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) + return nil, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + target := targetResolver.Scheme() + ":///" + targetTestAddr + // Set up a manual DNS resolver to control the proxy address resolution. + _, proxyResolverBuilt := setupDNS(t) + + tcc, stateCh, _ := createTestResolverClientConn(t) + if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + // Set network to anything other than tcp. + nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix") + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{nonTCPAddr}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + var gotState resolver.State + select { + case gotState = <-stateCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for a state update from the delegating resolver") + } + + // Verify that the delegating resolver doesn't call proxy resolver's + // UpdateState since we have no tcp address + select { + case <-proxyResolverBuilt: + t.Fatal("Unexpected call to proxy resolver update state") + case <-time.After(defaultTestShortTimeout): + } + + wantState := resolver.State{ + Addresses: []resolver.Address{nonTCPAddr}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + } + + // Verify that the state clientconn receives is same as updated by target + // resolver, since we want to avoid proxy for any network type apart from + // tcp. + if diff := cmp.Diff(gotState, wantState); diff != "" { + t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%s", diff) + } +} + +// Tests the scenario where a proxy is configured, and the resolver returns tcp +// and non-tcp addresses. The test verifies that the delegating resolver doesn't +// add proxyatrribute to adresses with network type other than tcp , but adds +// the proxyattribute to addresses with network type tcp. +func (s) TestDelegatingResolverForMixNetworkType(t *testing.T) { + const ( + targetTestAddr = "test.target" + resolvedTargetTestAddr1 = "1.1.1.1:8080" + resolvedTargetTestAddr2 = "2.2.2.2:8080" + envProxyAddr = "proxytest.com" + ) + hpfe := func(req *http.Request) (*url.URL, error) { + if req.URL.Host == targetTestAddr { + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) + return nil, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + target := targetResolver.Scheme() + ":///" + targetTestAddr + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver, proxyResolverBuilt := setupDNS(t) + + tcc, stateCh, _ := createTestResolverClientConn(t) + if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + // Set network to anything other than tcp. + nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix") + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + select { + case <-stateCh: + t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") + case <-time.After(defaultTestShortTimeout): + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: envProxyAddr}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + var gotState resolver.State + select { + case gotState = <-stateCh: + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") + } + wantState := resolver.State{ + Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + } + + if diff := cmp.Diff(gotState, wantState); diff != "" { + t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff) + } +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 513dbb93d550..e12ea02ac8cb 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -176,7 +176,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error return fn(ctx, address) } if !ok { - networkType, address = parseDialTarget(address) + networkType, address = ParseDialTarget(address) } if opts, present := proxyattributes.Get(addr); present { return proxyDial(ctx, addr, grpcUA, opts) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 3613d7b64817..f997f9fdb5d0 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -439,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool { return pool } -// parseDialTarget returns the network and address to pass to dialer. -func parseDialTarget(target string) (string, string) { +// ParseDialTarget returns the network and address to pass to dialer. +func ParseDialTarget(target string) (string, string) { net := "tcp" m1 := strings.Index(target, ":") m2 := strings.Index(target, ":/") diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index 5a259d43cdc2..5eb466f38561 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -211,9 +211,9 @@ func (s) TestParseDialTarget(t *testing.T) { {"dns:///google.com", "tcp", "dns:///google.com"}, {"/unix/socket/address", "tcp", "/unix/socket/address"}, } { - gotNet, gotAddr := parseDialTarget(test.target) + gotNet, gotAddr := ParseDialTarget(test.target) if gotNet != test.wantNet || gotAddr != test.wantAddr { - t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) + t.Errorf("ParseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) } } }