forked from toolshed/abra
		
	We were running behind and there were quite some deprecations to update. This was mostly in the upstream copy/pasta package but seems quite minimal.
		
			
				
	
	
		
			933 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			933 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2024 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| // Package pickfirstleaf contains the pick_first load balancing policy which
 | |
| // will be the universal leaf policy after dualstack changes are implemented.
 | |
| //
 | |
| // # Experimental
 | |
| //
 | |
| // Notice: This package is EXPERIMENTAL and may be changed or removed in a
 | |
| // later release.
 | |
| package pickfirstleaf
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"net/netip"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"google.golang.org/grpc/balancer"
 | |
| 	"google.golang.org/grpc/balancer/pickfirst/internal"
 | |
| 	"google.golang.org/grpc/connectivity"
 | |
| 	expstats "google.golang.org/grpc/experimental/stats"
 | |
| 	"google.golang.org/grpc/grpclog"
 | |
| 	"google.golang.org/grpc/internal/envconfig"
 | |
| 	internalgrpclog "google.golang.org/grpc/internal/grpclog"
 | |
| 	"google.golang.org/grpc/internal/pretty"
 | |
| 	"google.golang.org/grpc/resolver"
 | |
| 	"google.golang.org/grpc/serviceconfig"
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	if envconfig.NewPickFirstEnabled {
 | |
| 		// Register as the default pick_first balancer.
 | |
| 		Name = "pick_first"
 | |
| 	}
 | |
| 	balancer.Register(pickfirstBuilder{})
 | |
| }
 | |
| 
 | |
| type (
 | |
| 	// enableHealthListenerKeyType is a unique key type used in resolver
 | |
| 	// attributes to indicate whether the health listener usage is enabled.
 | |
| 	enableHealthListenerKeyType struct{}
 | |
| 	// managedByPickfirstKeyType is an attribute key type to inform Outlier
 | |
| 	// Detection that the generic health listener is being used.
 | |
| 	// TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when
 | |
| 	// implementing the dualstack design. This is a hack. Once Dualstack is
 | |
| 	// completed, outlier detection will stop sending ejection updates through
 | |
| 	// the connectivity listener.
 | |
| 	managedByPickfirstKeyType struct{}
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	logger = grpclog.Component("pick-first-leaf-lb")
 | |
| 	// Name is the name of the pick_first_leaf balancer.
 | |
| 	// It is changed to "pick_first" in init() if this balancer is to be
 | |
| 	// registered as the default pickfirst.
 | |
| 	Name                 = "pick_first_leaf"
 | |
| 	disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 | |
| 		Name:        "grpc.lb.pick_first.disconnections",
 | |
| 		Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
 | |
| 		Unit:        "disconnection",
 | |
| 		Labels:      []string{"grpc.target"},
 | |
| 		Default:     false,
 | |
| 	})
 | |
| 	connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 | |
| 		Name:        "grpc.lb.pick_first.connection_attempts_succeeded",
 | |
| 		Description: "EXPERIMENTAL. Number of successful connection attempts.",
 | |
| 		Unit:        "attempt",
 | |
| 		Labels:      []string{"grpc.target"},
 | |
| 		Default:     false,
 | |
| 	})
 | |
| 	connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
 | |
| 		Name:        "grpc.lb.pick_first.connection_attempts_failed",
 | |
| 		Description: "EXPERIMENTAL. Number of failed connection attempts.",
 | |
| 		Unit:        "attempt",
 | |
| 		Labels:      []string{"grpc.target"},
 | |
| 		Default:     false,
 | |
| 	})
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// TODO: change to pick-first when this becomes the default pick_first policy.
 | |
| 	logPrefix = "[pick-first-leaf-lb %p] "
 | |
| 	// connectionDelayInterval is the time to wait for during the happy eyeballs
 | |
| 	// pass before starting the next connection attempt.
 | |
| 	connectionDelayInterval = 250 * time.Millisecond
 | |
| )
 | |
| 
 | |
| type ipAddrFamily int
 | |
| 
 | |
| const (
 | |
| 	// ipAddrFamilyUnknown represents strings that can't be parsed as an IP
 | |
| 	// address.
 | |
| 	ipAddrFamilyUnknown ipAddrFamily = iota
 | |
| 	ipAddrFamilyV4
 | |
| 	ipAddrFamilyV6
 | |
| )
 | |
| 
 | |
| type pickfirstBuilder struct{}
 | |
| 
 | |
| func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer {
 | |
| 	b := &pickfirstBalancer{
 | |
| 		cc:              cc,
 | |
| 		target:          bo.Target.String(),
 | |
| 		metricsRecorder: cc.MetricsRecorder(),
 | |
| 
 | |
| 		subConns:              resolver.NewAddressMap(),
 | |
| 		state:                 connectivity.Connecting,
 | |
| 		cancelConnectionTimer: func() {},
 | |
| 	}
 | |
| 	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| func (b pickfirstBuilder) Name() string {
 | |
| 	return Name
 | |
| }
 | |
| 
 | |
| func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
 | |
| 	var cfg pfConfig
 | |
| 	if err := json.Unmarshal(js, &cfg); err != nil {
 | |
| 		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
 | |
| 	}
 | |
| 	return cfg, nil
 | |
| }
 | |
| 
 | |
| // EnableHealthListener updates the state to configure pickfirst for using a
 | |
| // generic health listener.
 | |
| func EnableHealthListener(state resolver.State) resolver.State {
 | |
| 	state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true)
 | |
| 	return state
 | |
| }
 | |
| 
 | |
| // IsManagedByPickfirst returns whether an address belongs to a SubConn
 | |
| // managed by the pickfirst LB policy.
 | |
| // TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable
 | |
| // outlier_detection via the with connectivity listener when using pick_first.
 | |
| // Once Dualstack changes are complete, all SubConns will be created by
 | |
| // pick_first and outlier detection will only use the health listener for
 | |
| // ejection. This hack can then be removed.
 | |
| func IsManagedByPickfirst(addr resolver.Address) bool {
 | |
| 	return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil
 | |
| }
 | |
| 
 | |
| type pfConfig struct {
 | |
| 	serviceconfig.LoadBalancingConfig `json:"-"`
 | |
| 
 | |
| 	// If set to true, instructs the LB policy to shuffle the order of the list
 | |
| 	// of endpoints received from the name resolver before attempting to
 | |
| 	// connect to them.
 | |
| 	ShuffleAddressList bool `json:"shuffleAddressList"`
 | |
| }
 | |
| 
 | |
| // scData keeps track of the current state of the subConn.
 | |
| // It is not safe for concurrent access.
 | |
| type scData struct {
 | |
| 	// The following fields are initialized at build time and read-only after
 | |
| 	// that.
 | |
| 	subConn balancer.SubConn
 | |
| 	addr    resolver.Address
 | |
| 
 | |
| 	rawConnectivityState connectivity.State
 | |
| 	// The effective connectivity state based on raw connectivity, health state
 | |
| 	// and after following sticky TransientFailure behaviour defined in A62.
 | |
| 	effectiveState              connectivity.State
 | |
| 	lastErr                     error
 | |
| 	connectionFailedInFirstPass bool
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
 | |
| 	addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true)
 | |
| 	sd := &scData{
 | |
| 		rawConnectivityState: connectivity.Idle,
 | |
| 		effectiveState:       connectivity.Idle,
 | |
| 		addr:                 addr,
 | |
| 	}
 | |
| 	sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
 | |
| 		StateListener: func(state balancer.SubConnState) {
 | |
| 			b.updateSubConnState(sd, state)
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	sd.subConn = sc
 | |
| 	return sd, nil
 | |
| }
 | |
| 
 | |
| type pickfirstBalancer struct {
 | |
| 	// The following fields are initialized at build time and read-only after
 | |
| 	// that and therefore do not need to be guarded by a mutex.
 | |
| 	logger          *internalgrpclog.PrefixLogger
 | |
| 	cc              balancer.ClientConn
 | |
| 	target          string
 | |
| 	metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil
 | |
| 
 | |
| 	// The mutex is used to ensure synchronization of updates triggered
 | |
| 	// from the idle picker and the already serialized resolver,
 | |
| 	// SubConn state updates.
 | |
| 	mu sync.Mutex
 | |
| 	// State reported to the channel based on SubConn states and resolver
 | |
| 	// updates.
 | |
| 	state connectivity.State
 | |
| 	// scData for active subonns mapped by address.
 | |
| 	subConns              *resolver.AddressMap
 | |
| 	addressList           addressList
 | |
| 	firstPass             bool
 | |
| 	numTF                 int
 | |
| 	cancelConnectionTimer func()
 | |
| 	healthCheckingEnabled bool
 | |
| }
 | |
| 
 | |
| // ResolverError is called by the ClientConn when the name resolver produces
 | |
| // an error or when pickfirst determined the resolver update to be invalid.
 | |
| func (b *pickfirstBalancer) ResolverError(err error) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	b.resolverErrorLocked(err)
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) resolverErrorLocked(err error) {
 | |
| 	if b.logger.V(2) {
 | |
| 		b.logger.Infof("Received error from the name resolver: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// The picker will not change since the balancer does not currently
 | |
| 	// report an error. If the balancer hasn't received a single good resolver
 | |
| 	// update yet, transition to TRANSIENT_FAILURE.
 | |
| 	if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
 | |
| 		if b.logger.V(2) {
 | |
| 			b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	b.updateBalancerState(balancer.State{
 | |
| 		ConnectivityState: connectivity.TransientFailure,
 | |
| 		Picker:            &picker{err: fmt.Errorf("name resolver error: %v", err)},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	b.cancelConnectionTimer()
 | |
| 	if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
 | |
| 		// Cleanup state pertaining to the previous resolver state.
 | |
| 		// Treat an empty address list like an error by calling b.ResolverError.
 | |
| 		b.closeSubConnsLocked()
 | |
| 		b.addressList.updateAddrs(nil)
 | |
| 		b.resolverErrorLocked(errors.New("produced zero addresses"))
 | |
| 		return balancer.ErrBadResolverState
 | |
| 	}
 | |
| 	b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil
 | |
| 	cfg, ok := state.BalancerConfig.(pfConfig)
 | |
| 	if state.BalancerConfig != nil && !ok {
 | |
| 		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
 | |
| 	}
 | |
| 
 | |
| 	if b.logger.V(2) {
 | |
| 		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
 | |
| 	}
 | |
| 
 | |
| 	var newAddrs []resolver.Address
 | |
| 	if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
 | |
| 		// Perform the optional shuffling described in gRFC A62. The shuffling
 | |
| 		// will change the order of endpoints but not touch the order of the
 | |
| 		// addresses within each endpoint. - A61
 | |
| 		if cfg.ShuffleAddressList {
 | |
| 			endpoints = append([]resolver.Endpoint{}, endpoints...)
 | |
| 			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
 | |
| 		}
 | |
| 
 | |
| 		// "Flatten the list by concatenating the ordered list of addresses for
 | |
| 		// each of the endpoints, in order." - A61
 | |
| 		for _, endpoint := range endpoints {
 | |
| 			newAddrs = append(newAddrs, endpoint.Addresses...)
 | |
| 		}
 | |
| 	} else {
 | |
| 		// Endpoints not set, process addresses until we migrate resolver
 | |
| 		// emissions fully to Endpoints. The top channel does wrap emitted
 | |
| 		// addresses with endpoints, however some balancers such as weighted
 | |
| 		// target do not forward the corresponding correct endpoints down/split
 | |
| 		// endpoints properly. Once all balancers correctly forward endpoints
 | |
| 		// down, can delete this else conditional.
 | |
| 		newAddrs = state.ResolverState.Addresses
 | |
| 		if cfg.ShuffleAddressList {
 | |
| 			newAddrs = append([]resolver.Address{}, newAddrs...)
 | |
| 			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// If an address appears in multiple endpoints or in the same endpoint
 | |
| 	// multiple times, we keep it only once. We will create only one SubConn
 | |
| 	// for the address because an AddressMap is used to store SubConns.
 | |
| 	// Not de-duplicating would result in attempting to connect to the same
 | |
| 	// SubConn multiple times in the same pass. We don't want this.
 | |
| 	newAddrs = deDupAddresses(newAddrs)
 | |
| 	newAddrs = interleaveAddresses(newAddrs)
 | |
| 
 | |
| 	prevAddr := b.addressList.currentAddress()
 | |
| 	prevSCData, found := b.subConns.Get(prevAddr)
 | |
| 	prevAddrsCount := b.addressList.size()
 | |
| 	isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready
 | |
| 	b.addressList.updateAddrs(newAddrs)
 | |
| 
 | |
| 	// If the previous ready SubConn exists in new address list,
 | |
| 	// keep this connection and don't create new SubConns.
 | |
| 	if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	b.reconcileSubConnsLocked(newAddrs)
 | |
| 	// If it's the first resolver update or the balancer was already READY
 | |
| 	// (but the new address list does not contain the ready SubConn) or
 | |
| 	// CONNECTING, enter CONNECTING.
 | |
| 	// We may be in TRANSIENT_FAILURE due to a previous empty address list,
 | |
| 	// we should still enter CONNECTING because the sticky TF behaviour
 | |
| 	//  mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
 | |
| 	// due to connectivity failures.
 | |
| 	if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 {
 | |
| 		// Start connection attempt at first address.
 | |
| 		b.forceUpdateConcludedStateLocked(balancer.State{
 | |
| 			ConnectivityState: connectivity.Connecting,
 | |
| 			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 | |
| 		})
 | |
| 		b.startFirstPassLocked()
 | |
| 	} else if b.state == connectivity.TransientFailure {
 | |
| 		// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
 | |
| 		// we're READY. See A62.
 | |
| 		b.startFirstPassLocked()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // UpdateSubConnState is unused as a StateListener is always registered when
 | |
| // creating SubConns.
 | |
| func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
 | |
| 	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) Close() {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	b.closeSubConnsLocked()
 | |
| 	b.cancelConnectionTimer()
 | |
| 	b.state = connectivity.Shutdown
 | |
| }
 | |
| 
 | |
| // ExitIdle moves the balancer out of idle state. It can be called concurrently
 | |
| // by the idlePicker and clientConn so access to variables should be
 | |
| // synchronized.
 | |
| func (b *pickfirstBalancer) ExitIdle() {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	if b.state == connectivity.Idle {
 | |
| 		b.startFirstPassLocked()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) startFirstPassLocked() {
 | |
| 	b.firstPass = true
 | |
| 	b.numTF = 0
 | |
| 	// Reset the connection attempt record for existing SubConns.
 | |
| 	for _, sd := range b.subConns.Values() {
 | |
| 		sd.(*scData).connectionFailedInFirstPass = false
 | |
| 	}
 | |
| 	b.requestConnectionLocked()
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) closeSubConnsLocked() {
 | |
| 	for _, sd := range b.subConns.Values() {
 | |
| 		sd.(*scData).subConn.Shutdown()
 | |
| 	}
 | |
| 	b.subConns = resolver.NewAddressMap()
 | |
| }
 | |
| 
 | |
| // deDupAddresses ensures that each address appears only once in the slice.
 | |
| func deDupAddresses(addrs []resolver.Address) []resolver.Address {
 | |
| 	seenAddrs := resolver.NewAddressMap()
 | |
| 	retAddrs := []resolver.Address{}
 | |
| 
 | |
| 	for _, addr := range addrs {
 | |
| 		if _, ok := seenAddrs.Get(addr); ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		retAddrs = append(retAddrs, addr)
 | |
| 	}
 | |
| 	return retAddrs
 | |
| }
 | |
| 
 | |
| // interleaveAddresses interleaves addresses of both families (IPv4 and IPv6)
 | |
| // as per RFC-8305 section 4.
 | |
| // Whichever address family is first in the list is followed by an address of
 | |
| // the other address family; that is, if the first address in the list is IPv6,
 | |
| // then the first IPv4 address should be moved up in the list to be second in
 | |
| // the list. It doesn't support configuring "First Address Family Count", i.e.
 | |
| // there will always be a single member of the first address family at the
 | |
| // beginning of the interleaved list.
 | |
| // Addresses that are neither IPv4 nor IPv6 are treated as part of a third
 | |
| // "unknown" family for interleaving.
 | |
| // See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6
 | |
| func interleaveAddresses(addrs []resolver.Address) []resolver.Address {
 | |
| 	familyAddrsMap := map[ipAddrFamily][]resolver.Address{}
 | |
| 	interleavingOrder := []ipAddrFamily{}
 | |
| 	for _, addr := range addrs {
 | |
| 		family := addressFamily(addr.Addr)
 | |
| 		if _, found := familyAddrsMap[family]; !found {
 | |
| 			interleavingOrder = append(interleavingOrder, family)
 | |
| 		}
 | |
| 		familyAddrsMap[family] = append(familyAddrsMap[family], addr)
 | |
| 	}
 | |
| 
 | |
| 	interleavedAddrs := make([]resolver.Address, 0, len(addrs))
 | |
| 
 | |
| 	for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) {
 | |
| 		// Some IP types may have fewer addresses than others, so we look for
 | |
| 		// the next type that has a remaining member to add to the interleaved
 | |
| 		// list.
 | |
| 		family := interleavingOrder[curFamilyIdx]
 | |
| 		remainingMembers := familyAddrsMap[family]
 | |
| 		if len(remainingMembers) > 0 {
 | |
| 			interleavedAddrs = append(interleavedAddrs, remainingMembers[0])
 | |
| 			familyAddrsMap[family] = remainingMembers[1:]
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return interleavedAddrs
 | |
| }
 | |
| 
 | |
| // addressFamily returns the ipAddrFamily after parsing the address string.
 | |
| // If the address isn't of the format "ip-address:port", it returns
 | |
| // ipAddrFamilyUnknown. The address may be valid even if it's not an IP when
 | |
| // using a resolver like passthrough where the address may be a hostname in
 | |
| // some format that the dialer can resolve.
 | |
| func addressFamily(address string) ipAddrFamily {
 | |
| 	// Parse the IP after removing the port.
 | |
| 	host, _, err := net.SplitHostPort(address)
 | |
| 	if err != nil {
 | |
| 		return ipAddrFamilyUnknown
 | |
| 	}
 | |
| 	ip, err := netip.ParseAddr(host)
 | |
| 	if err != nil {
 | |
| 		return ipAddrFamilyUnknown
 | |
| 	}
 | |
| 	switch {
 | |
| 	case ip.Is4() || ip.Is4In6():
 | |
| 		return ipAddrFamilyV4
 | |
| 	case ip.Is6():
 | |
| 		return ipAddrFamilyV6
 | |
| 	default:
 | |
| 		return ipAddrFamilyUnknown
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // reconcileSubConnsLocked updates the active subchannels based on a new address
 | |
| // list from the resolver. It does this by:
 | |
| //   - closing subchannels: any existing subchannels associated with addresses
 | |
| //     that are no longer in the updated list are shut down.
 | |
| //   - removing subchannels: entries for these closed subchannels are removed
 | |
| //     from the subchannel map.
 | |
| //
 | |
| // This ensures that the subchannel map accurately reflects the current set of
 | |
| // addresses received from the name resolver.
 | |
| func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
 | |
| 	newAddrsMap := resolver.NewAddressMap()
 | |
| 	for _, addr := range newAddrs {
 | |
| 		newAddrsMap.Set(addr, true)
 | |
| 	}
 | |
| 
 | |
| 	for _, oldAddr := range b.subConns.Keys() {
 | |
| 		if _, ok := newAddrsMap.Get(oldAddr); ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		val, _ := b.subConns.Get(oldAddr)
 | |
| 		val.(*scData).subConn.Shutdown()
 | |
| 		b.subConns.Delete(oldAddr)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
 | |
| // becomes ready, which means that all other subConn must be shutdown.
 | |
| func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
 | |
| 	b.cancelConnectionTimer()
 | |
| 	for _, v := range b.subConns.Values() {
 | |
| 		sd := v.(*scData)
 | |
| 		if sd.subConn != selected.subConn {
 | |
| 			sd.subConn.Shutdown()
 | |
| 		}
 | |
| 	}
 | |
| 	b.subConns = resolver.NewAddressMap()
 | |
| 	b.subConns.Set(selected.addr, selected)
 | |
| }
 | |
| 
 | |
| // requestConnectionLocked starts connecting on the subchannel corresponding to
 | |
| // the current address. If no subchannel exists, one is created. If the current
 | |
| // subchannel is in TransientFailure, a connection to the next address is
 | |
| // attempted until a subchannel is found.
 | |
| func (b *pickfirstBalancer) requestConnectionLocked() {
 | |
| 	if !b.addressList.isValid() {
 | |
| 		return
 | |
| 	}
 | |
| 	var lastErr error
 | |
| 	for valid := true; valid; valid = b.addressList.increment() {
 | |
| 		curAddr := b.addressList.currentAddress()
 | |
| 		sd, ok := b.subConns.Get(curAddr)
 | |
| 		if !ok {
 | |
| 			var err error
 | |
| 			// We want to assign the new scData to sd from the outer scope,
 | |
| 			// hence we can't use := below.
 | |
| 			sd, err = b.newSCData(curAddr)
 | |
| 			if err != nil {
 | |
| 				// This should never happen, unless the clientConn is being shut
 | |
| 				// down.
 | |
| 				if b.logger.V(2) {
 | |
| 					b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
 | |
| 				}
 | |
| 				// Do nothing, the LB policy will be closed soon.
 | |
| 				return
 | |
| 			}
 | |
| 			b.subConns.Set(curAddr, sd)
 | |
| 		}
 | |
| 
 | |
| 		scd := sd.(*scData)
 | |
| 		switch scd.rawConnectivityState {
 | |
| 		case connectivity.Idle:
 | |
| 			scd.subConn.Connect()
 | |
| 			b.scheduleNextConnectionLocked()
 | |
| 			return
 | |
| 		case connectivity.TransientFailure:
 | |
| 			// The SubConn is being re-used and failed during a previous pass
 | |
| 			// over the addressList. It has not completed backoff yet.
 | |
| 			// Mark it as having failed and try the next address.
 | |
| 			scd.connectionFailedInFirstPass = true
 | |
| 			lastErr = scd.lastErr
 | |
| 			continue
 | |
| 		case connectivity.Connecting:
 | |
| 			// Wait for the connection attempt to complete or the timer to fire
 | |
| 			// before attempting the next address.
 | |
| 			b.scheduleNextConnectionLocked()
 | |
| 			return
 | |
| 		default:
 | |
| 			b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState)
 | |
| 			return
 | |
| 
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
 | |
| 	// first pass if possible.
 | |
| 	b.endFirstPassIfPossibleLocked(lastErr)
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) scheduleNextConnectionLocked() {
 | |
| 	b.cancelConnectionTimer()
 | |
| 	if !b.addressList.hasNext() {
 | |
| 		return
 | |
| 	}
 | |
| 	curAddr := b.addressList.currentAddress()
 | |
| 	cancelled := false // Access to this is protected by the balancer's mutex.
 | |
| 	closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() {
 | |
| 		b.mu.Lock()
 | |
| 		defer b.mu.Unlock()
 | |
| 		// If the scheduled task is cancelled while acquiring the mutex, return.
 | |
| 		if cancelled {
 | |
| 			return
 | |
| 		}
 | |
| 		if b.logger.V(2) {
 | |
| 			b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr)
 | |
| 		}
 | |
| 		if b.addressList.increment() {
 | |
| 			b.requestConnectionLocked()
 | |
| 		}
 | |
| 	})
 | |
| 	// Access to the cancellation callback held by the balancer is guarded by
 | |
| 	// the balancer's mutex, so it's safe to set the boolean from the callback.
 | |
| 	b.cancelConnectionTimer = sync.OnceFunc(func() {
 | |
| 		cancelled = true
 | |
| 		closeFn()
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	oldState := sd.rawConnectivityState
 | |
| 	sd.rawConnectivityState = newState.ConnectivityState
 | |
| 	// Previously relevant SubConns can still callback with state updates.
 | |
| 	// To prevent pickers from returning these obsolete SubConns, this logic
 | |
| 	// is included to check if the current list of active SubConns includes this
 | |
| 	// SubConn.
 | |
| 	if !b.isActiveSCData(sd) {
 | |
| 		return
 | |
| 	}
 | |
| 	if newState.ConnectivityState == connectivity.Shutdown {
 | |
| 		sd.effectiveState = connectivity.Shutdown
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Record a connection attempt when exiting CONNECTING.
 | |
| 	if newState.ConnectivityState == connectivity.TransientFailure {
 | |
| 		sd.connectionFailedInFirstPass = true
 | |
| 		connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
 | |
| 	}
 | |
| 
 | |
| 	if newState.ConnectivityState == connectivity.Ready {
 | |
| 		connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
 | |
| 		b.shutdownRemainingLocked(sd)
 | |
| 		if !b.addressList.seekTo(sd.addr) {
 | |
| 			// This should not fail as we should have only one SubConn after
 | |
| 			// entering READY. The SubConn should be present in the addressList.
 | |
| 			b.logger.Errorf("Address %q not found address list in  %v", sd.addr, b.addressList.addresses)
 | |
| 			return
 | |
| 		}
 | |
| 		if !b.healthCheckingEnabled {
 | |
| 			if b.logger.V(2) {
 | |
| 				b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn)
 | |
| 			}
 | |
| 
 | |
| 			sd.effectiveState = connectivity.Ready
 | |
| 			b.updateBalancerState(balancer.State{
 | |
| 				ConnectivityState: connectivity.Ready,
 | |
| 				Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
 | |
| 			})
 | |
| 			return
 | |
| 		}
 | |
| 		if b.logger.V(2) {
 | |
| 			b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn)
 | |
| 		}
 | |
| 		// Send a CONNECTING update to take the SubConn out of sticky-TF if
 | |
| 		// required.
 | |
| 		sd.effectiveState = connectivity.Connecting
 | |
| 		b.updateBalancerState(balancer.State{
 | |
| 			ConnectivityState: connectivity.Connecting,
 | |
| 			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 | |
| 		})
 | |
| 		sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) {
 | |
| 			b.updateSubConnHealthState(sd, scs)
 | |
| 		})
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// If the LB policy is READY, and it receives a subchannel state change,
 | |
| 	// it means that the READY subchannel has failed.
 | |
| 	// A SubConn can also transition from CONNECTING directly to IDLE when
 | |
| 	// a transport is successfully created, but the connection fails
 | |
| 	// before the SubConn can send the notification for READY. We treat
 | |
| 	// this as a successful connection and transition to IDLE.
 | |
| 	// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
 | |
| 	// part of the if condition below once the issue is fixed.
 | |
| 	if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
 | |
| 		// Once a transport fails, the balancer enters IDLE and starts from
 | |
| 		// the first address when the picker is used.
 | |
| 		b.shutdownRemainingLocked(sd)
 | |
| 		sd.effectiveState = newState.ConnectivityState
 | |
| 		// READY SubConn interspliced in between CONNECTING and IDLE, need to
 | |
| 		// account for that.
 | |
| 		if oldState == connectivity.Connecting {
 | |
| 			// A known issue (https://github.com/grpc/grpc-go/issues/7862)
 | |
| 			// causes a race that prevents the READY state change notification.
 | |
| 			// This works around it.
 | |
| 			connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target)
 | |
| 		}
 | |
| 		disconnectionsMetric.Record(b.metricsRecorder, 1, b.target)
 | |
| 		b.addressList.reset()
 | |
| 		b.updateBalancerState(balancer.State{
 | |
| 			ConnectivityState: connectivity.Idle,
 | |
| 			Picker:            &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
 | |
| 		})
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if b.firstPass {
 | |
| 		switch newState.ConnectivityState {
 | |
| 		case connectivity.Connecting:
 | |
| 			// The effective state can be in either IDLE, CONNECTING or
 | |
| 			// TRANSIENT_FAILURE. If it's  TRANSIENT_FAILURE, stay in
 | |
| 			// TRANSIENT_FAILURE until it's READY. See A62.
 | |
| 			if sd.effectiveState != connectivity.TransientFailure {
 | |
| 				sd.effectiveState = connectivity.Connecting
 | |
| 				b.updateBalancerState(balancer.State{
 | |
| 					ConnectivityState: connectivity.Connecting,
 | |
| 					Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 | |
| 				})
 | |
| 			}
 | |
| 		case connectivity.TransientFailure:
 | |
| 			sd.lastErr = newState.ConnectionError
 | |
| 			sd.effectiveState = connectivity.TransientFailure
 | |
| 			// Since we're re-using common SubConns while handling resolver
 | |
| 			// updates, we could receive an out of turn TRANSIENT_FAILURE from
 | |
| 			// a pass over the previous address list. Happy Eyeballs will also
 | |
| 			// cause out of order updates to arrive.
 | |
| 
 | |
| 			if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
 | |
| 				b.cancelConnectionTimer()
 | |
| 				if b.addressList.increment() {
 | |
| 					b.requestConnectionLocked()
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			// End the first pass if we've seen a TRANSIENT_FAILURE from all
 | |
| 			// SubConns once.
 | |
| 			b.endFirstPassIfPossibleLocked(newState.ConnectionError)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// We have finished the first pass, keep re-connecting failing SubConns.
 | |
| 	switch newState.ConnectivityState {
 | |
| 	case connectivity.TransientFailure:
 | |
| 		b.numTF = (b.numTF + 1) % b.subConns.Len()
 | |
| 		sd.lastErr = newState.ConnectionError
 | |
| 		if b.numTF%b.subConns.Len() == 0 {
 | |
| 			b.updateBalancerState(balancer.State{
 | |
| 				ConnectivityState: connectivity.TransientFailure,
 | |
| 				Picker:            &picker{err: newState.ConnectionError},
 | |
| 			})
 | |
| 		}
 | |
| 		// We don't need to request re-resolution since the SubConn already
 | |
| 		// does that before reporting TRANSIENT_FAILURE.
 | |
| 		// TODO: #7534 - Move re-resolution requests from SubConn into
 | |
| 		// pick_first.
 | |
| 	case connectivity.Idle:
 | |
| 		sd.subConn.Connect()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the
 | |
| // addresses are tried and their SubConns have reported a failure.
 | |
| func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
 | |
| 	// An optimization to avoid iterating over the entire SubConn map.
 | |
| 	if b.addressList.isValid() {
 | |
| 		return
 | |
| 	}
 | |
| 	// Connect() has been called on all the SubConns. The first pass can be
 | |
| 	// ended if all the SubConns have reported a failure.
 | |
| 	for _, v := range b.subConns.Values() {
 | |
| 		sd := v.(*scData)
 | |
| 		if !sd.connectionFailedInFirstPass {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	b.firstPass = false
 | |
| 	b.updateBalancerState(balancer.State{
 | |
| 		ConnectivityState: connectivity.TransientFailure,
 | |
| 		Picker:            &picker{err: lastErr},
 | |
| 	})
 | |
| 	// Start re-connecting all the SubConns that are already in IDLE.
 | |
| 	for _, v := range b.subConns.Values() {
 | |
| 		sd := v.(*scData)
 | |
| 		if sd.rawConnectivityState == connectivity.Idle {
 | |
| 			sd.subConn.Connect()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool {
 | |
| 	activeSD, found := b.subConns.Get(sd.addr)
 | |
| 	return found && activeSD == sd
 | |
| }
 | |
| 
 | |
| func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 	// Previously relevant SubConns can still callback with state updates.
 | |
| 	// To prevent pickers from returning these obsolete SubConns, this logic
 | |
| 	// is included to check if the current list of active SubConns includes
 | |
| 	// this SubConn.
 | |
| 	if !b.isActiveSCData(sd) {
 | |
| 		return
 | |
| 	}
 | |
| 	sd.effectiveState = state.ConnectivityState
 | |
| 	switch state.ConnectivityState {
 | |
| 	case connectivity.Ready:
 | |
| 		b.updateBalancerState(balancer.State{
 | |
| 			ConnectivityState: connectivity.Ready,
 | |
| 			Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}},
 | |
| 		})
 | |
| 	case connectivity.TransientFailure:
 | |
| 		b.updateBalancerState(balancer.State{
 | |
| 			ConnectivityState: connectivity.TransientFailure,
 | |
| 			Picker:            &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)},
 | |
| 		})
 | |
| 	case connectivity.Connecting:
 | |
| 		b.updateBalancerState(balancer.State{
 | |
| 			ConnectivityState: connectivity.Connecting,
 | |
| 			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
 | |
| 		})
 | |
| 	default:
 | |
| 		b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // updateBalancerState stores the state reported to the channel and calls
 | |
| // ClientConn.UpdateState(). As an optimization, it avoids sending duplicate
 | |
| // updates to the channel.
 | |
| func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) {
 | |
| 	// In case of TransientFailures allow the picker to be updated to update
 | |
| 	// the connectivity error, in all other cases don't send duplicate state
 | |
| 	// updates.
 | |
| 	if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure {
 | |
| 		return
 | |
| 	}
 | |
| 	b.forceUpdateConcludedStateLocked(newState)
 | |
| }
 | |
| 
 | |
| // forceUpdateConcludedStateLocked stores the state reported to the channel and
 | |
| // calls ClientConn.UpdateState().
 | |
| // A separate function is defined to force update the ClientConn state since the
 | |
| // channel doesn't correctly assume that LB policies start in CONNECTING and
 | |
| // relies on LB policy to send an initial CONNECTING update.
 | |
| func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) {
 | |
| 	b.state = newState.ConnectivityState
 | |
| 	b.cc.UpdateState(newState)
 | |
| }
 | |
| 
 | |
| type picker struct {
 | |
| 	result balancer.PickResult
 | |
| 	err    error
 | |
| }
 | |
| 
 | |
| func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
 | |
| 	return p.result, p.err
 | |
| }
 | |
| 
 | |
| // idlePicker is used when the SubConn is IDLE and kicks the SubConn into
 | |
| // CONNECTING when Pick is called.
 | |
| type idlePicker struct {
 | |
| 	exitIdle func()
 | |
| }
 | |
| 
 | |
| func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
 | |
| 	i.exitIdle()
 | |
| 	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
 | |
| }
 | |
| 
 | |
| // addressList manages sequentially iterating over addresses present in a list
 | |
| // of endpoints. It provides a 1 dimensional view of the addresses present in
 | |
| // the endpoints.
 | |
| // This type is not safe for concurrent access.
 | |
| type addressList struct {
 | |
| 	addresses []resolver.Address
 | |
| 	idx       int
 | |
| }
 | |
| 
 | |
| func (al *addressList) isValid() bool {
 | |
| 	return al.idx < len(al.addresses)
 | |
| }
 | |
| 
 | |
| func (al *addressList) size() int {
 | |
| 	return len(al.addresses)
 | |
| }
 | |
| 
 | |
| // increment moves to the next index in the address list.
 | |
| // This method returns false if it went off the list, true otherwise.
 | |
| func (al *addressList) increment() bool {
 | |
| 	if !al.isValid() {
 | |
| 		return false
 | |
| 	}
 | |
| 	al.idx++
 | |
| 	return al.idx < len(al.addresses)
 | |
| }
 | |
| 
 | |
| // currentAddress returns the current address pointed to in the addressList.
 | |
| // If the list is in an invalid state, it returns an empty address instead.
 | |
| func (al *addressList) currentAddress() resolver.Address {
 | |
| 	if !al.isValid() {
 | |
| 		return resolver.Address{}
 | |
| 	}
 | |
| 	return al.addresses[al.idx]
 | |
| }
 | |
| 
 | |
| func (al *addressList) reset() {
 | |
| 	al.idx = 0
 | |
| }
 | |
| 
 | |
| func (al *addressList) updateAddrs(addrs []resolver.Address) {
 | |
| 	al.addresses = addrs
 | |
| 	al.reset()
 | |
| }
 | |
| 
 | |
| // seekTo returns false if the needle was not found and the current index was
 | |
| // left unchanged.
 | |
| func (al *addressList) seekTo(needle resolver.Address) bool {
 | |
| 	for ai, addr := range al.addresses {
 | |
| 		if !equalAddressIgnoringBalAttributes(&addr, &needle) {
 | |
| 			continue
 | |
| 		}
 | |
| 		al.idx = ai
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // hasNext returns whether incrementing the addressList will result in moving
 | |
| // past the end of the list. If the list has already moved past the end, it
 | |
| // returns false.
 | |
| func (al *addressList) hasNext() bool {
 | |
| 	if !al.isValid() {
 | |
| 		return false
 | |
| 	}
 | |
| 	return al.idx+1 < len(al.addresses)
 | |
| }
 | |
| 
 | |
| // equalAddressIgnoringBalAttributes returns true is a and b are considered
 | |
| // equal. This is different from the Equal method on the resolver.Address type
 | |
| // which considers all fields to determine equality. Here, we only consider
 | |
| // fields that are meaningful to the SubConn.
 | |
| func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
 | |
| 	return a.Addr == b.Addr && a.ServerName == b.ServerName &&
 | |
| 		a.Attributes.Equal(b.Attributes) &&
 | |
| 		a.Metadata == b.Metadata
 | |
| }
 |