Skip to content

cherry-pick #8215, #8302 and #8304 to v1.71.x branch #8298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 116 additions & 57 deletions internal/resolver/delegatingresolver/delegatingresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -40,14 +42,17 @@

// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// a intermediary between the gRPC ClientConn and the child resolvers.
// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
target resolver.Target // parsed target URI to be resolved
cc resolver.ClientConn // gRPC ClientConn
proxyURL *url.URL // proxy URL, derived from proxy environment and target

// We do not hold both mu and childMu in the same goroutine. Avoid holding
// both locks when calling into the child, as the child resolver may
// synchronously callback into the channel.
mu sync.Mutex // protects all the fields below
targetResolverState *resolver.State // state of the target resolver
proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured
Expand All @@ -66,8 +71,8 @@

func (nopResolver) Close() {}

// proxyURLForTarget determines the proxy URL for the given address based on
// the environment. It can return the following:
// proxyURLForTarget determines the proxy URL for the given address based on the
// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
Expand All @@ -86,7 +91,8 @@
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
// scheme.
// scheme. It is lazily built when a target resolver update contains at least
// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
Expand All @@ -95,8 +101,10 @@
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
target: target,
cc: cc,
target: target,
cc: cc,
proxyResolver: nopResolver{},
targetResolver: nopResolver{},
}

var err error
Expand All @@ -123,37 +131,26 @@
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
state := resolver.State{
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
}
r.targetResolverState = &state
} else {
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
}

if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err)
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
}

if r.targetResolver == nil {
r.targetResolver = nopResolver{}
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.proxyResolver == nil {
r.proxyResolver = nopResolver{}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)

Check warning on line 146 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L146

Added line #L146 was not covered by tests
}
return r, nil
}

// proxyURIResolver creates a resolver for resolving proxy URIs using the
// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and
// builds a resolver with a wrappingClientConn to capture resolved addresses.
// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
Expand Down Expand Up @@ -189,18 +186,43 @@
r.proxyResolver = nil
}

// updateClientConnStateLocked creates a list of combined addresses by
// pairing each proxy address with every target address. For each pair, it
// generates a new [resolver.Address] using the proxy address, and adding the
// target address as the attribute along with user info. It returns nil if
// either resolver has not sent update even once and returns the error from
// ClientConn update once both resolvers have sent update atleast once.
func networkTypeFromAddr(addr resolver.Address) string {
networkType, ok := networktype.Get(addr)
if !ok {
networkType, _ = transport.ParseDialTarget(addr.Addr)
}
return networkType
}

func isTCPAddressPresent(state *resolver.State) bool {
for _, addr := range state.Addresses {
if networkType := networkTypeFromAddr(addr); networkType == "tcp" {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if networktype := networkTypeFromAddr(addr); networktype == "tcp" {
return true
}
}
}
return false
}

// updateClientConnStateLocked constructs a combined list of addresses by
// pairing each proxy address with every target address of type TCP. For each
// pair, it creates a new [resolver.Address] using the proxy address and
// attaches the corresponding target address and user info as attributes. Target
// addresses that are not of type TCP are appended to the list as-is. The
// function returns nil if either resolver has not yet provided an update, and
// returns the result of ClientConn.UpdateState once both resolvers have
// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}

curState := *r.targetResolverState
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
Expand All @@ -218,24 +240,30 @@
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}

// Create a list of combined endpoints by pairing all proxy endpoints
// with every target endpoint. Each time, it constructs a new
// [resolver.Endpoint] using the all addresses from all the proxy endpoint
// and the target addresses from one endpoint. The target address and user
// information from the proxy URL are added as attributes to the proxy
// address.The resulting list of addresses is then grouped into endpoints,
// covering all combinations of proxy and target endpoints.
// For each target endpoint, construct a new [resolver.Endpoint] that
// includes all addresses from all proxy endpoints and the addresses from
// that target endpoint, preserving the number of target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
for _, proxyAddr := range r.proxyAddrs {
for _, targetAddr := range endpt.Addresses {
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
Expand All @@ -246,8 +274,9 @@
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
// resolvers have sent their updates, and curState has been updated with
// the combined addresses.
// resolvers have sent their updates, and curState has been updated with the
// combined addresses.
curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
Expand All @@ -257,16 +286,17 @@
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
// is a StateListener function of wrappingClientConn passed to the proxy resolver.
// is a StateListener function of wrappingClientConn passed to the proxy
// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
// We expect exactly one address per endpoint because the proxy
// resolver uses "dns" resolution.
// We expect exactly one address per endpoint because the proxy resolver
// uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
Expand Down Expand Up @@ -294,11 +324,14 @@
return err
}

// updateTargetResolverState updates the target resolver state by storing target
// addresses, endpoints, and service config, marking the resolver as ready, and
// triggering a state update if both resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It
// is a StateListener function of wrappingClientConn passed to the target resolver.
// updateTargetResolverState is the StateListener function provided to the
// target resolver via wrappingClientConn. It updates the resolver state and
// marks the target resolver as ready. If the update includes at least one TCP
// address and the proxy resolver has not yet been constructed, it initializes
// the proxy resolver. A combined state update is triggered once both resolvers
// are ready. If all addresses are non-TCP, it proceeds without waiting for the
// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -307,6 +340,31 @@
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
// If no addresses returned by resolver have network type as tcp , do not
// wait for proxy update.
if !isTCPAddressPresent(r.targetResolverState) {
return r.cc.UpdateState(*r.targetResolverState)
}

// The proxy resolver may be rebuilt multiple times, specifically each time
// the target resolver sends an update, even if the target resolver is built
// successfully but building the proxy resolver fails.
if len(r.proxyAddrs) == 0 {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if _, ok := r.proxyResolver.(nopResolver); !ok {
return
}
proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
if err != nil {
r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
return
}

Check warning on line 363 in internal/resolver/delegatingresolver/delegatingresolver.go

View check run for this annotation

Codecov / codecov/patch

internal/resolver/delegatingresolver/delegatingresolver.go#L361-L363

Added lines #L361 - L363 were not covered by tests
r.proxyResolver = proxyResolver
}()
}

err := r.updateClientConnStateLocked()
if err != nil {
go func() {
Expand Down Expand Up @@ -335,7 +393,8 @@
return wcc.stateListener(state)
}

// ReportError intercepts errors from the child resolvers and passes them to ClientConn.
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
Expand All @@ -346,8 +405,8 @@
wcc.UpdateState(resolver.State{Addresses: addrs})
}

// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
// ParseServiceConfig parses the provided service config and returns an object
// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}
Loading
Loading