Updates swarmkit, grpc, and all related vendors Signed-off-by: Derek McGowan <derek@mcgstyle.net> Upstream-commit: 52ed3e0896c203013388a3eb603658f345a9d80a Component: engine
1343 lines
41 KiB
Go
1343 lines
41 KiB
Go
package node
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/swarmkit/ca/keyutils"
|
|
"github.com/docker/swarmkit/identity"
|
|
|
|
"github.com/boltdb/bolt"
|
|
"github.com/docker/docker/pkg/plugingetter"
|
|
metrics "github.com/docker/go-metrics"
|
|
"github.com/docker/swarmkit/agent"
|
|
"github.com/docker/swarmkit/agent/exec"
|
|
"github.com/docker/swarmkit/api"
|
|
"github.com/docker/swarmkit/ca"
|
|
"github.com/docker/swarmkit/connectionbroker"
|
|
"github.com/docker/swarmkit/ioutils"
|
|
"github.com/docker/swarmkit/log"
|
|
"github.com/docker/swarmkit/manager"
|
|
"github.com/docker/swarmkit/manager/encryption"
|
|
"github.com/docker/swarmkit/remotes"
|
|
"github.com/docker/swarmkit/xnet"
|
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
stateFilename = "state.json"
|
|
roleChangeTimeout = 16 * time.Second
|
|
)
|
|
|
|
var (
|
|
nodeInfo metrics.LabeledGauge
|
|
nodeManager metrics.Gauge
|
|
|
|
errNodeStarted = errors.New("node: already started")
|
|
errNodeNotStarted = errors.New("node: not started")
|
|
certDirectory = "certificates"
|
|
|
|
// ErrInvalidUnlockKey is returned when we can't decrypt the TLS certificate
|
|
ErrInvalidUnlockKey = errors.New("node is locked, and needs a valid unlock key")
|
|
|
|
// ErrMandatoryFIPS is returned when the cluster we are joining mandates FIPS, but we are running in non-FIPS mode
|
|
ErrMandatoryFIPS = errors.New("node is not FIPS-enabled but cluster requires FIPS")
|
|
)
|
|
|
|
func init() {
|
|
ns := metrics.NewNamespace("swarm", "node", nil)
|
|
nodeInfo = ns.NewLabeledGauge("info", "Information related to the swarm", "",
|
|
"swarm_id",
|
|
"node_id",
|
|
)
|
|
nodeManager = ns.NewGauge("manager", "Whether this node is a manager or not", "")
|
|
metrics.Register(ns)
|
|
}
|
|
|
|
// Config provides values for a Node.
|
|
type Config struct {
|
|
// Hostname is the name of host for agent instance.
|
|
Hostname string
|
|
|
|
// JoinAddr specifies node that should be used for the initial connection to
|
|
// other manager in cluster. This should be only one address and optional,
|
|
// the actual remotes come from the stored state.
|
|
JoinAddr string
|
|
|
|
// StateDir specifies the directory the node uses to keep the state of the
|
|
// remote managers and certificates.
|
|
StateDir string
|
|
|
|
// JoinToken is the token to be used on the first certificate request.
|
|
JoinToken string
|
|
|
|
// ExternalCAs is a list of CAs to which a manager node
|
|
// will make certificate signing requests for node certificates.
|
|
ExternalCAs []*api.ExternalCA
|
|
|
|
// ForceNewCluster creates a new cluster from current raft state.
|
|
ForceNewCluster bool
|
|
|
|
// ListenControlAPI specifies address the control API should listen on.
|
|
ListenControlAPI string
|
|
|
|
// ListenRemoteAPI specifies the address for the remote API that agents
|
|
// and raft members connect to.
|
|
ListenRemoteAPI string
|
|
|
|
// AdvertiseRemoteAPI specifies the address that should be advertised
|
|
// for connections to the remote API (including the raft service).
|
|
AdvertiseRemoteAPI string
|
|
|
|
// Executor specifies the executor to use for the agent.
|
|
Executor exec.Executor
|
|
|
|
// ElectionTick defines the amount of ticks needed without
|
|
// leader to trigger a new election
|
|
ElectionTick uint32
|
|
|
|
// HeartbeatTick defines the amount of ticks between each
|
|
// heartbeat sent to other members for health-check purposes
|
|
HeartbeatTick uint32
|
|
|
|
// AutoLockManagers determines whether or not an unlock key will be generated
|
|
// when bootstrapping a new cluster for the first time
|
|
AutoLockManagers bool
|
|
|
|
// UnlockKey is the key to unlock a node - used for decrypting at rest. This
|
|
// only applies to nodes that have already joined a cluster.
|
|
UnlockKey []byte
|
|
|
|
// Availability allows a user to control the current scheduling status of a node
|
|
Availability api.NodeSpec_Availability
|
|
|
|
// PluginGetter provides access to docker's plugin inventory.
|
|
PluginGetter plugingetter.PluginGetter
|
|
|
|
// FIPS is a boolean stating whether the node is FIPS enabled
|
|
FIPS bool
|
|
}
|
|
|
|
// Node implements the primary node functionality for a member of a swarm
|
|
// cluster. Node handles workloads and may also run as a manager.
|
|
type Node struct {
|
|
sync.RWMutex
|
|
config *Config
|
|
remotes *persistentRemotes
|
|
connBroker *connectionbroker.Broker
|
|
role string
|
|
roleCond *sync.Cond
|
|
conn *grpc.ClientConn
|
|
connCond *sync.Cond
|
|
nodeID string
|
|
started chan struct{}
|
|
startOnce sync.Once
|
|
stopped chan struct{}
|
|
stopOnce sync.Once
|
|
ready chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
|
|
closed chan struct{}
|
|
err error
|
|
agent *agent.Agent
|
|
manager *manager.Manager
|
|
notifyNodeChange chan *agent.NodeChanges // used by the agent to relay node updates from the dispatcher Session stream to (*Node).run
|
|
unlockKey []byte
|
|
}
|
|
|
|
type lastSeenRole struct {
|
|
role api.NodeRole
|
|
}
|
|
|
|
// observe notes the latest value of this node role, and returns true if it
|
|
// is the first seen value, or is different from the most recently seen value.
|
|
func (l *lastSeenRole) observe(newRole api.NodeRole) bool {
|
|
changed := l.role != newRole
|
|
l.role = newRole
|
|
return changed
|
|
}
|
|
|
|
// RemoteAPIAddr returns address on which remote manager api listens.
|
|
// Returns nil if node is not manager.
|
|
func (n *Node) RemoteAPIAddr() (string, error) {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
if n.manager == nil {
|
|
return "", errors.New("manager is not running")
|
|
}
|
|
addr := n.manager.Addr()
|
|
if addr == "" {
|
|
return "", errors.New("manager addr is not set")
|
|
}
|
|
return addr, nil
|
|
}
|
|
|
|
// New returns new Node instance.
|
|
func New(c *Config) (*Node, error) {
|
|
if err := os.MkdirAll(c.StateDir, 0700); err != nil {
|
|
return nil, err
|
|
}
|
|
stateFile := filepath.Join(c.StateDir, stateFilename)
|
|
dt, err := ioutil.ReadFile(stateFile)
|
|
var p []api.Peer
|
|
if err != nil && !os.IsNotExist(err) {
|
|
return nil, err
|
|
}
|
|
if err == nil {
|
|
if err := json.Unmarshal(dt, &p); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
n := &Node{
|
|
remotes: newPersistentRemotes(stateFile, p...),
|
|
role: ca.WorkerRole,
|
|
config: c,
|
|
started: make(chan struct{}),
|
|
stopped: make(chan struct{}),
|
|
closed: make(chan struct{}),
|
|
ready: make(chan struct{}),
|
|
notifyNodeChange: make(chan *agent.NodeChanges, 1),
|
|
unlockKey: c.UnlockKey,
|
|
}
|
|
|
|
if n.config.JoinAddr != "" || n.config.ForceNewCluster {
|
|
n.remotes = newPersistentRemotes(filepath.Join(n.config.StateDir, stateFilename))
|
|
if n.config.JoinAddr != "" {
|
|
n.remotes.Observe(api.Peer{Addr: n.config.JoinAddr}, remotes.DefaultObservationWeight)
|
|
}
|
|
}
|
|
|
|
n.connBroker = connectionbroker.New(n.remotes)
|
|
|
|
n.roleCond = sync.NewCond(n.RLocker())
|
|
n.connCond = sync.NewCond(n.RLocker())
|
|
return n, nil
|
|
}
|
|
|
|
// BindRemote starts a listener that exposes the remote API.
|
|
func (n *Node) BindRemote(ctx context.Context, listenAddr string, advertiseAddr string) error {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
|
|
if n.manager == nil {
|
|
return errors.New("manager is not running")
|
|
}
|
|
|
|
return n.manager.BindRemote(ctx, manager.RemoteAddrs{
|
|
ListenAddr: listenAddr,
|
|
AdvertiseAddr: advertiseAddr,
|
|
})
|
|
}
|
|
|
|
// Start starts a node instance.
|
|
func (n *Node) Start(ctx context.Context) error {
|
|
err := errNodeStarted
|
|
|
|
n.startOnce.Do(func() {
|
|
close(n.started)
|
|
go n.run(ctx)
|
|
err = nil // clear error above, only once.
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (n *Node) currentRole() api.NodeRole {
|
|
n.Lock()
|
|
currentRole := api.NodeRoleWorker
|
|
if n.role == ca.ManagerRole {
|
|
currentRole = api.NodeRoleManager
|
|
}
|
|
n.Unlock()
|
|
return currentRole
|
|
}
|
|
|
|
func (n *Node) run(ctx context.Context) (err error) {
|
|
defer func() {
|
|
n.err = err
|
|
// close the n.closed channel to indicate that the Node has completely
|
|
// terminated
|
|
close(n.closed)
|
|
}()
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
ctx = log.WithModule(ctx, "node")
|
|
|
|
// set up a goroutine to monitor the stop channel, and cancel the run
|
|
// context when the node is stopped
|
|
go func(ctx context.Context) {
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-n.stopped:
|
|
cancel()
|
|
}
|
|
}(ctx)
|
|
|
|
// First thing's first: get the SecurityConfig for this node. This includes
|
|
// the certificate information, and the root CA. It also returns a cancel
|
|
// function. This is needed because the SecurityConfig is a live object,
|
|
// and provides a watch queue so that caller can observe changes to the
|
|
// security config. This watch queue has to be closed, which is done by the
|
|
// secConfigCancel function.
|
|
//
|
|
// It's also noteworthy that loading the security config with the node's
|
|
// loadSecurityConfig method has the side effect of setting the node's ID
|
|
// and role fields, meaning it isn't until after that point that node knows
|
|
// its ID
|
|
paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
|
|
securityConfig, secConfigCancel, err := n.loadSecurityConfig(ctx, paths)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer secConfigCancel()
|
|
|
|
// Now that we have the security config, we can get a TLSRenewer, which is
|
|
// a live component handling certificate rotation.
|
|
renewer := ca.NewTLSRenewer(securityConfig, n.connBroker, paths.RootCA)
|
|
|
|
// Now that we have the security goop all loaded, we know the Node's ID and
|
|
// can add that to our logging context.
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("node.id", n.NodeID()))
|
|
|
|
// Next, set up the task database. The task database is used by the agent
|
|
// to keep a persistent local record of its tasks. Since every manager also
|
|
// has an agent, every node needs a task database, so we do this regardless
|
|
// of role.
|
|
taskDBPath := filepath.Join(n.config.StateDir, "worker", "tasks.db")
|
|
// Doing os.MkdirAll will create the necessary directory path for the task
|
|
// database if it doesn't already exist, and if it does already exist, no
|
|
// error will be returned, so we use this regardless of whether this node
|
|
// is new or not.
|
|
if err := os.MkdirAll(filepath.Dir(taskDBPath), 0777); err != nil {
|
|
return err
|
|
}
|
|
|
|
db, err := bolt.Open(taskDBPath, 0666, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
// agentDone is a channel that represents the agent having exited. We start
|
|
// the agent in a goroutine a few blocks down, and before that goroutine
|
|
// exits, it closes this channel to signal to the goroutine just below to
|
|
// terminate.
|
|
agentDone := make(chan struct{})
|
|
|
|
// This goroutine is the node changes loop. The n.notifyNodeChange
|
|
// channel is passed to the agent. When an new node object gets sent down
|
|
// to the agent, it gets passed back up to this node object, so that we can
|
|
// check if a role update or a root certificate rotation is required. This
|
|
// handles root rotation, but the renewer handles regular certification
|
|
// rotation.
|
|
go func() {
|
|
// lastNodeDesiredRole is the last-seen value of Node.Spec.DesiredRole,
|
|
// used to make role changes "edge triggered" and avoid renewal loops.
|
|
lastNodeDesiredRole := lastSeenRole{role: n.currentRole()}
|
|
|
|
for {
|
|
select {
|
|
case <-agentDone:
|
|
return
|
|
case nodeChanges := <-n.notifyNodeChange:
|
|
if nodeChanges.Node != nil {
|
|
// This is a bit complex to be backward compatible with older CAs that
|
|
// don't support the Node.Role field. They only use what's presently
|
|
// called DesiredRole.
|
|
// 1) If DesiredRole changes, kick off a certificate renewal. The renewal
|
|
// is delayed slightly to give Role time to change as well if this is
|
|
// a newer CA. If the certificate we get back doesn't have the expected
|
|
// role, we continue renewing with exponential backoff.
|
|
// 2) If the server is sending us IssuanceStateRotate, renew the cert as
|
|
// requested by the CA.
|
|
desiredRoleChanged := lastNodeDesiredRole.observe(nodeChanges.Node.Spec.DesiredRole)
|
|
if desiredRoleChanged {
|
|
switch nodeChanges.Node.Spec.DesiredRole {
|
|
case api.NodeRoleManager:
|
|
renewer.SetExpectedRole(ca.ManagerRole)
|
|
case api.NodeRoleWorker:
|
|
renewer.SetExpectedRole(ca.WorkerRole)
|
|
}
|
|
}
|
|
if desiredRoleChanged || nodeChanges.Node.Certificate.Status.State == api.IssuanceStateRotate {
|
|
renewer.Renew()
|
|
}
|
|
}
|
|
|
|
if nodeChanges.RootCert != nil {
|
|
if bytes.Equal(nodeChanges.RootCert, securityConfig.RootCA().Certs) {
|
|
continue
|
|
}
|
|
newRootCA, err := ca.NewRootCA(nodeChanges.RootCert, nil, nil, ca.DefaultNodeCertExpiration, nil)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("invalid new root certificate from the dispatcher")
|
|
continue
|
|
}
|
|
if err := securityConfig.UpdateRootCA(&newRootCA); err != nil {
|
|
log.G(ctx).WithError(err).Error("could not use new root CA from dispatcher")
|
|
continue
|
|
}
|
|
if err := ca.SaveRootCA(newRootCA, paths.RootCA); err != nil {
|
|
log.G(ctx).WithError(err).Error("could not save new root certificate from the dispatcher")
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Now we're going to launch the main component goroutines, the Agent, the
|
|
// Manager (maybe) and the certificate updates loop. We shouldn't exit
|
|
// the node object until all 3 of these components have terminated, so we
|
|
// create a waitgroup to block termination of the node until then
|
|
var wg sync.WaitGroup
|
|
wg.Add(3)
|
|
|
|
// These two blocks update some of the metrics settings.
|
|
nodeInfo.WithValues(
|
|
securityConfig.ClientTLSCreds.Organization(),
|
|
securityConfig.ClientTLSCreds.NodeID(),
|
|
).Set(1)
|
|
|
|
if n.currentRole() == api.NodeRoleManager {
|
|
nodeManager.Set(1)
|
|
} else {
|
|
nodeManager.Set(0)
|
|
}
|
|
|
|
// We created the renewer way up when we were creating the SecurityConfig
|
|
// at the beginning of run, but now we're ready to start receiving
|
|
// CertificateUpdates, and launch a goroutine to handle this. Updates is a
|
|
// channel we iterate containing the results of certificate renewals.
|
|
updates := renewer.Start(ctx)
|
|
go func() {
|
|
for certUpdate := range updates {
|
|
if certUpdate.Err != nil {
|
|
logrus.Warnf("error renewing TLS certificate: %v", certUpdate.Err)
|
|
continue
|
|
}
|
|
// Set the new role, and notify our waiting role changing logic
|
|
// that the role has changed.
|
|
n.Lock()
|
|
n.role = certUpdate.Role
|
|
n.roleCond.Broadcast()
|
|
n.Unlock()
|
|
|
|
// Export the new role for metrics
|
|
if n.currentRole() == api.NodeRoleManager {
|
|
nodeManager.Set(1)
|
|
} else {
|
|
nodeManager.Set(0)
|
|
}
|
|
}
|
|
|
|
wg.Done()
|
|
}()
|
|
|
|
// and, finally, start the two main components: the manager and the agent
|
|
role := n.role
|
|
|
|
// Channels to signal when these respective components are up and ready to
|
|
// go.
|
|
managerReady := make(chan struct{})
|
|
agentReady := make(chan struct{})
|
|
// these variables are defined in this scope so that they're closed on by
|
|
// respective goroutines below.
|
|
var managerErr error
|
|
var agentErr error
|
|
go func() {
|
|
// superviseManager is a routine that watches our manager role
|
|
managerErr = n.superviseManager(ctx, securityConfig, paths.RootCA, managerReady, renewer) // store err and loop
|
|
wg.Done()
|
|
cancel()
|
|
}()
|
|
go func() {
|
|
agentErr = n.runAgent(ctx, db, securityConfig, agentReady)
|
|
wg.Done()
|
|
cancel()
|
|
close(agentDone)
|
|
}()
|
|
|
|
// This goroutine is what signals that the node has fully started by
|
|
// closing the n.ready channel. First, it waits for the agent to start.
|
|
// Then, if this node is a manager, it will wait on either the manager
|
|
// starting, or the node role changing. This ensures that if the node is
|
|
// demoted before the manager starts, it doesn't get stuck.
|
|
go func() {
|
|
<-agentReady
|
|
if role == ca.ManagerRole {
|
|
workerRole := make(chan struct{})
|
|
waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
|
|
go func() {
|
|
if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
|
|
close(workerRole)
|
|
}
|
|
}()
|
|
select {
|
|
case <-managerReady:
|
|
case <-workerRole:
|
|
}
|
|
waitRoleCancel()
|
|
}
|
|
close(n.ready)
|
|
}()
|
|
|
|
// And, finally, we park and wait for the node to close up. If we get any
|
|
// error other than context canceled, we return it.
|
|
wg.Wait()
|
|
if managerErr != nil && errors.Cause(managerErr) != context.Canceled {
|
|
return managerErr
|
|
}
|
|
if agentErr != nil && errors.Cause(agentErr) != context.Canceled {
|
|
return agentErr
|
|
}
|
|
// NOTE(dperny): we return err here, but the last time I can see err being
|
|
// set is when we open the boltdb way up in this method, so I don't know
|
|
// what returning err is supposed to do.
|
|
return err
|
|
}
|
|
|
|
// Stop stops node execution
|
|
func (n *Node) Stop(ctx context.Context) error {
|
|
select {
|
|
case <-n.started:
|
|
default:
|
|
return errNodeNotStarted
|
|
}
|
|
// ask agent to clean up assignments
|
|
n.Lock()
|
|
if n.agent != nil {
|
|
if err := n.agent.Leave(ctx); err != nil {
|
|
log.G(ctx).WithError(err).Error("agent failed to clean up assignments")
|
|
}
|
|
}
|
|
n.Unlock()
|
|
|
|
n.stopOnce.Do(func() {
|
|
close(n.stopped)
|
|
})
|
|
|
|
select {
|
|
case <-n.closed:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Err returns the error that caused the node to shutdown or nil. Err blocks
|
|
// until the node has fully shut down.
|
|
func (n *Node) Err(ctx context.Context) error {
|
|
select {
|
|
case <-n.closed:
|
|
return n.err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// runAgent starts the node's agent. When the agent has started, the provided
|
|
// ready channel is closed. When the agent exits, this will return the error
|
|
// that caused it.
|
|
func (n *Node) runAgent(ctx context.Context, db *bolt.DB, securityConfig *ca.SecurityConfig, ready chan<- struct{}) error {
|
|
// First, get a channel for knowing when a remote peer has been selected.
|
|
// The value returned from the remotesCh is ignored, we just need to know
|
|
// when the peer is selected
|
|
remotesCh := n.remotes.WaitSelect(ctx)
|
|
// then, we set up a new context to pass specifically to
|
|
// ListenControlSocket, and start that method to wait on a connection on
|
|
// the cluster control API.
|
|
waitCtx, waitCancel := context.WithCancel(ctx)
|
|
controlCh := n.ListenControlSocket(waitCtx)
|
|
|
|
// The goal here to wait either until we have a remote peer selected, or
|
|
// connection to the control
|
|
// socket. These are both ways to connect the
|
|
// agent to a manager, and we need to wait until one or the other is
|
|
// available to start the agent
|
|
waitPeer:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
break waitPeer
|
|
case <-remotesCh:
|
|
break waitPeer
|
|
case conn := <-controlCh:
|
|
// conn will probably be nil the first time we call this, probably,
|
|
// but only a non-nil conn represent an actual connection.
|
|
if conn != nil {
|
|
break waitPeer
|
|
}
|
|
}
|
|
}
|
|
|
|
// We can stop listening for new control socket connections once we're
|
|
// ready
|
|
waitCancel()
|
|
|
|
// NOTE(dperny): not sure why we need to recheck the context here. I guess
|
|
// it avoids a race if the context was canceled at the same time that a
|
|
// connection or peer was available. I think it's just an optimization.
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Now we can go ahead and configure, create, and start the agent.
|
|
secChangesCh, secChangesCancel := securityConfig.Watch()
|
|
defer secChangesCancel()
|
|
|
|
rootCA := securityConfig.RootCA()
|
|
issuer := securityConfig.IssuerInfo()
|
|
|
|
agentConfig := &agent.Config{
|
|
Hostname: n.config.Hostname,
|
|
ConnBroker: n.connBroker,
|
|
Executor: n.config.Executor,
|
|
DB: db,
|
|
NotifyNodeChange: n.notifyNodeChange,
|
|
NotifyTLSChange: secChangesCh,
|
|
Credentials: securityConfig.ClientTLSCreds,
|
|
NodeTLSInfo: &api.NodeTLSInfo{
|
|
TrustRoot: rootCA.Certs,
|
|
CertIssuerPublicKey: issuer.PublicKey,
|
|
CertIssuerSubject: issuer.Subject,
|
|
},
|
|
FIPS: n.config.FIPS,
|
|
}
|
|
// if a join address has been specified, then if the agent fails to connect
|
|
// due to a TLS error, fail fast - don't keep re-trying to join
|
|
if n.config.JoinAddr != "" {
|
|
agentConfig.SessionTracker = &firstSessionErrorTracker{}
|
|
}
|
|
|
|
a, err := agent.New(agentConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := a.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
n.Lock()
|
|
n.agent = a
|
|
n.Unlock()
|
|
|
|
defer func() {
|
|
n.Lock()
|
|
n.agent = nil
|
|
n.Unlock()
|
|
}()
|
|
|
|
// when the agent indicates that it is ready, we close the ready channel.
|
|
go func() {
|
|
<-a.Ready()
|
|
close(ready)
|
|
}()
|
|
|
|
// todo: manually call stop on context cancellation?
|
|
|
|
return a.Err(context.Background())
|
|
}
|
|
|
|
// Ready returns a channel that is closed after node's initialization has
|
|
// completes for the first time.
|
|
func (n *Node) Ready() <-chan struct{} {
|
|
return n.ready
|
|
}
|
|
|
|
func (n *Node) setControlSocket(conn *grpc.ClientConn) {
|
|
n.Lock()
|
|
if n.conn != nil {
|
|
n.conn.Close()
|
|
}
|
|
n.conn = conn
|
|
n.connBroker.SetLocalConn(conn)
|
|
n.connCond.Broadcast()
|
|
n.Unlock()
|
|
}
|
|
|
|
// ListenControlSocket listens changes of a connection for managing the
|
|
// cluster control api
|
|
func (n *Node) ListenControlSocket(ctx context.Context) <-chan *grpc.ClientConn {
|
|
c := make(chan *grpc.ClientConn, 1)
|
|
n.RLock()
|
|
conn := n.conn
|
|
c <- conn
|
|
done := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
n.connCond.Broadcast()
|
|
case <-done:
|
|
}
|
|
}()
|
|
go func() {
|
|
defer close(c)
|
|
defer close(done)
|
|
defer n.RUnlock()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
if conn == n.conn {
|
|
n.connCond.Wait()
|
|
continue
|
|
}
|
|
conn = n.conn
|
|
select {
|
|
case c <- conn:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return c
|
|
}
|
|
|
|
// NodeID returns current node's ID. May be empty if not set.
|
|
func (n *Node) NodeID() string {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.nodeID
|
|
}
|
|
|
|
// Manager returns manager instance started by node. May be nil.
|
|
func (n *Node) Manager() *manager.Manager {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.manager
|
|
}
|
|
|
|
// Agent returns agent instance started by node. May be nil.
|
|
func (n *Node) Agent() *agent.Agent {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.agent
|
|
}
|
|
|
|
// IsStateDirty returns true if any objects have been added to raft which make
|
|
// the state "dirty". Currently, the existence of any object other than the
|
|
// default cluster or the local node implies a dirty state.
|
|
func (n *Node) IsStateDirty() (bool, error) {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
|
|
if n.manager == nil {
|
|
return false, errors.New("node is not a manager")
|
|
}
|
|
|
|
return n.manager.IsStateDirty()
|
|
}
|
|
|
|
// Remotes returns a list of known peers known to node.
|
|
func (n *Node) Remotes() []api.Peer {
|
|
weights := n.remotes.Weights()
|
|
remotes := make([]api.Peer, 0, len(weights))
|
|
for p := range weights {
|
|
remotes = append(remotes, p)
|
|
}
|
|
return remotes
|
|
}
|
|
|
|
// Given a cluster ID, returns whether the cluster ID indicates that the cluster
|
|
// mandates FIPS mode. These cluster IDs start with "FIPS." as a prefix.
|
|
func isMandatoryFIPSClusterID(securityConfig *ca.SecurityConfig) bool {
|
|
return strings.HasPrefix(securityConfig.ClientTLSCreds.Organization(), "FIPS.")
|
|
}
|
|
|
|
// Given a join token, returns whether it indicates that the cluster mandates FIPS
|
|
// mode.
|
|
func isMandatoryFIPSClusterJoinToken(joinToken string) bool {
|
|
if parsed, err := ca.ParseJoinToken(joinToken); err == nil {
|
|
return parsed.FIPS
|
|
}
|
|
return false
|
|
}
|
|
|
|
func generateFIPSClusterID() string {
|
|
return "FIPS." + identity.NewID()
|
|
}
|
|
|
|
func (n *Node) loadSecurityConfig(ctx context.Context, paths *ca.SecurityConfigPaths) (*ca.SecurityConfig, func() error, error) {
|
|
var (
|
|
securityConfig *ca.SecurityConfig
|
|
cancel func() error
|
|
)
|
|
|
|
krw := ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{FIPS: n.config.FIPS})
|
|
// if FIPS is required, we want to make sure our key is stored in PKCS8 format
|
|
if n.config.FIPS {
|
|
krw.SetKeyFormatter(keyutils.FIPS)
|
|
}
|
|
if err := krw.Migrate(); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Check if we already have a valid certificates on disk.
|
|
rootCA, err := ca.GetLocalRootCA(paths.RootCA)
|
|
if err != nil && err != ca.ErrNoLocalRootCA {
|
|
return nil, nil, err
|
|
}
|
|
if err == nil {
|
|
// if forcing a new cluster, we allow the certificates to be expired - a new set will be generated
|
|
securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
|
|
if err != nil {
|
|
_, isInvalidKEK := errors.Cause(err).(ca.ErrInvalidKEK)
|
|
if isInvalidKEK {
|
|
return nil, nil, ErrInvalidUnlockKey
|
|
} else if !os.IsNotExist(err) {
|
|
return nil, nil, errors.Wrapf(err, "error while loading TLS certificate in %s", paths.Node.Cert)
|
|
}
|
|
}
|
|
}
|
|
|
|
if securityConfig == nil {
|
|
if n.config.JoinAddr == "" {
|
|
// if we're not joining a cluster, bootstrap a new one - and we have to set the unlock key
|
|
n.unlockKey = nil
|
|
if n.config.AutoLockManagers {
|
|
n.unlockKey = encryption.GenerateSecretKey()
|
|
}
|
|
krw = ca.NewKeyReadWriter(paths.Node, n.unlockKey, &manager.RaftDEKData{FIPS: n.config.FIPS})
|
|
rootCA, err = ca.CreateRootCA(ca.DefaultRootCN)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if err := ca.SaveRootCA(rootCA, paths.RootCA); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
log.G(ctx).Debug("generated CA key and certificate")
|
|
} else if err == ca.ErrNoLocalRootCA { // from previous error loading the root CA from disk
|
|
// if we are attempting to join another cluster, which has a FIPS join token, and we are not FIPS, error
|
|
if n.config.JoinAddr != "" && isMandatoryFIPSClusterJoinToken(n.config.JoinToken) && !n.config.FIPS {
|
|
return nil, nil, ErrMandatoryFIPS
|
|
}
|
|
rootCA, err = ca.DownloadRootCA(ctx, paths.RootCA, n.config.JoinToken, n.connBroker)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
log.G(ctx).Debug("downloaded CA certificate")
|
|
}
|
|
|
|
// Obtain new certs and setup TLS certificates renewal for this node:
|
|
// - If certificates weren't present on disk, we call CreateSecurityConfig, which blocks
|
|
// until a valid certificate has been issued.
|
|
// - We wait for CreateSecurityConfig to finish since we need a certificate to operate.
|
|
|
|
// Attempt to load certificate from disk
|
|
securityConfig, cancel, err = ca.LoadSecurityConfig(ctx, rootCA, krw, n.config.ForceNewCluster)
|
|
if err == nil {
|
|
log.G(ctx).WithFields(logrus.Fields{
|
|
"node.id": securityConfig.ClientTLSCreds.NodeID(),
|
|
}).Debugf("loaded TLS certificate")
|
|
} else {
|
|
if _, ok := errors.Cause(err).(ca.ErrInvalidKEK); ok {
|
|
return nil, nil, ErrInvalidUnlockKey
|
|
}
|
|
log.G(ctx).WithError(err).Debugf("no node credentials found in: %s", krw.Target())
|
|
|
|
// if we are attempting to join another cluster, which has a FIPS join token, and we are not FIPS, error
|
|
if n.config.JoinAddr != "" && isMandatoryFIPSClusterJoinToken(n.config.JoinToken) && !n.config.FIPS {
|
|
return nil, nil, ErrMandatoryFIPS
|
|
}
|
|
|
|
requestConfig := ca.CertificateRequestConfig{
|
|
Token: n.config.JoinToken,
|
|
Availability: n.config.Availability,
|
|
ConnBroker: n.connBroker,
|
|
}
|
|
// If this is a new cluster, we want to name the cluster ID "FIPS-something"
|
|
if n.config.FIPS {
|
|
requestConfig.Organization = generateFIPSClusterID()
|
|
}
|
|
securityConfig, cancel, err = rootCA.CreateSecurityConfig(ctx, krw, requestConfig)
|
|
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
if isMandatoryFIPSClusterID(securityConfig) && !n.config.FIPS {
|
|
return nil, nil, ErrMandatoryFIPS
|
|
}
|
|
|
|
n.Lock()
|
|
n.role = securityConfig.ClientTLSCreds.Role()
|
|
n.nodeID = securityConfig.ClientTLSCreds.NodeID()
|
|
n.roleCond.Broadcast()
|
|
n.Unlock()
|
|
|
|
return securityConfig, cancel, nil
|
|
}
|
|
|
|
func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
|
|
opts := []grpc.DialOption{
|
|
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
|
|
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
|
|
}
|
|
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
|
|
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
|
|
addr := n.config.ListenControlAPI
|
|
opts = append(opts, grpc.WithDialer(
|
|
func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
return xnet.DialTimeoutLocal(addr, timeout)
|
|
}))
|
|
conn, err := grpc.Dial(addr, opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
client := api.NewHealthClient(conn)
|
|
for {
|
|
resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "ControlAPI"})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Status == api.HealthCheckResponse_SERVING {
|
|
break
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
n.setControlSocket(conn)
|
|
if ready != nil {
|
|
close(ready)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// waitRole takes a context and a role. it the blocks until the context is
|
|
// canceled or the node's role updates to the provided role. returns nil when
|
|
// the node has acquired the provided role, or ctx.Err() if the context is
|
|
// canceled
|
|
func (n *Node) waitRole(ctx context.Context, role string) error {
|
|
n.roleCond.L.Lock()
|
|
if role == n.role {
|
|
n.roleCond.L.Unlock()
|
|
return nil
|
|
}
|
|
finishCh := make(chan struct{})
|
|
defer close(finishCh)
|
|
go func() {
|
|
select {
|
|
case <-finishCh:
|
|
case <-ctx.Done():
|
|
// call broadcast to shutdown this function
|
|
n.roleCond.Broadcast()
|
|
}
|
|
}()
|
|
defer n.roleCond.L.Unlock()
|
|
for role != n.role {
|
|
n.roleCond.Wait()
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// runManager runs the manager on this node. It returns a boolean indicating if
|
|
// the stoppage was due to a role change, and an error indicating why the
|
|
// manager stopped
|
|
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, workerRole <-chan struct{}) (bool, error) {
|
|
// First, set up this manager's advertise and listen addresses, if
|
|
// provided. they might not be provided if this node is joining the cluster
|
|
// instead of creating a new one.
|
|
var remoteAPI *manager.RemoteAddrs
|
|
if n.config.ListenRemoteAPI != "" {
|
|
remoteAPI = &manager.RemoteAddrs{
|
|
ListenAddr: n.config.ListenRemoteAPI,
|
|
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
|
|
}
|
|
}
|
|
|
|
joinAddr := n.config.JoinAddr
|
|
if joinAddr == "" {
|
|
remoteAddr, err := n.remotes.Select(n.NodeID())
|
|
if err == nil {
|
|
joinAddr = remoteAddr.Addr
|
|
}
|
|
}
|
|
|
|
m, err := manager.New(&manager.Config{
|
|
ForceNewCluster: n.config.ForceNewCluster,
|
|
RemoteAPI: remoteAPI,
|
|
ControlAPI: n.config.ListenControlAPI,
|
|
SecurityConfig: securityConfig,
|
|
ExternalCAs: n.config.ExternalCAs,
|
|
JoinRaft: joinAddr,
|
|
ForceJoin: n.config.JoinAddr != "",
|
|
StateDir: n.config.StateDir,
|
|
HeartbeatTick: n.config.HeartbeatTick,
|
|
ElectionTick: n.config.ElectionTick,
|
|
AutoLockManagers: n.config.AutoLockManagers,
|
|
UnlockKey: n.unlockKey,
|
|
Availability: n.config.Availability,
|
|
PluginGetter: n.config.PluginGetter,
|
|
RootCAPaths: rootPaths,
|
|
FIPS: n.config.FIPS,
|
|
})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
// The done channel is used to signal that the manager has exited.
|
|
done := make(chan struct{})
|
|
// runErr is an error value set by the goroutine that runs the manager
|
|
var runErr error
|
|
|
|
// The context used to start this might have a logger associated with it
|
|
// that we'd like to reuse, but we don't want to use that context, so we
|
|
// pass to the goroutine only the logger, and create a new context with
|
|
//that logger.
|
|
go func(logger *logrus.Entry) {
|
|
if err := m.Run(log.WithLogger(context.Background(), logger)); err != nil {
|
|
runErr = err
|
|
}
|
|
close(done)
|
|
}(log.G(ctx))
|
|
|
|
// clearData is set in the select below, and is used to signal why the
|
|
// manager is stopping, and indicate whether or not to delete raft data and
|
|
// keys when stopping the manager.
|
|
var clearData bool
|
|
defer func() {
|
|
n.Lock()
|
|
n.manager = nil
|
|
n.Unlock()
|
|
m.Stop(ctx, clearData)
|
|
<-done
|
|
n.setControlSocket(nil)
|
|
}()
|
|
|
|
n.Lock()
|
|
n.manager = m
|
|
n.Unlock()
|
|
|
|
connCtx, connCancel := context.WithCancel(ctx)
|
|
defer connCancel()
|
|
|
|
// launch a goroutine that will manage our local connection to the manager
|
|
// from the agent. Remember the managerReady channel created way back in
|
|
// run? This is actually where we close it. Not when the manager starts,
|
|
// but when a connection to the control socket has been established.
|
|
go n.initManagerConnection(connCtx, ready)
|
|
|
|
// wait for manager stop or for role change
|
|
// The manager can be stopped one of 4 ways:
|
|
// 1. The manager may have errored out and returned an error, closing the
|
|
// done channel in the process
|
|
// 2. The node may have been demoted to a worker. In this case, we're gonna
|
|
// have to stop the manager ourselves, setting clearData to true so the
|
|
// local raft data, certs, keys, etc, are nuked.
|
|
// 3. The manager may have been booted from raft. This could happen if it's
|
|
// removed from the raft quorum but the role update hasn't registered
|
|
// yet. The fact that there is more than 1 code path to cause the
|
|
// manager to exit is a possible source of bugs.
|
|
// 4. The context may have been canceled from above, in which case we
|
|
// should stop the manager ourselves, but indicate that this is NOT a
|
|
// demotion.
|
|
select {
|
|
case <-done:
|
|
return false, runErr
|
|
case <-workerRole:
|
|
log.G(ctx).Info("role changed to worker, stopping manager")
|
|
clearData = true
|
|
case <-m.RemovedFromRaft():
|
|
log.G(ctx).Info("manager removed from raft cluster, stopping manager")
|
|
clearData = true
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
}
|
|
return clearData, nil
|
|
}
|
|
|
|
// superviseManager controls whether or not we are running a manager on this
|
|
// node
|
|
func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, rootPaths ca.CertPaths, ready chan struct{}, renewer *ca.TLSRenewer) error {
|
|
// superviseManager is a loop, because we can come in and out of being a
|
|
// manager, and need to appropriately handle that without disrupting the
|
|
// node functionality.
|
|
for {
|
|
// if we're not a manager, we're just gonna park here and wait until we
|
|
// are. For normal agent nodes, we'll stay here forever, as intended.
|
|
if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Once we know we are a manager, we get ourselves ready for when we
|
|
// lose that role. we create a channel to signal that we've become a
|
|
// worker, and close it when n.waitRole completes.
|
|
workerRole := make(chan struct{})
|
|
waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
|
|
go func() {
|
|
if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil {
|
|
close(workerRole)
|
|
}
|
|
}()
|
|
|
|
// the ready channel passed to superviseManager is in turn passed down
|
|
// to the runManager function. It's used to signal to the caller that
|
|
// the manager has started.
|
|
wasRemoved, err := n.runManager(ctx, securityConfig, rootPaths, ready, workerRole)
|
|
if err != nil {
|
|
waitRoleCancel()
|
|
return errors.Wrap(err, "manager stopped")
|
|
}
|
|
|
|
// If the manager stopped running and our role is still
|
|
// "manager", it's possible that the manager was demoted and
|
|
// the agent hasn't realized this yet. We should wait for the
|
|
// role to change instead of restarting the manager immediately.
|
|
err = func() error {
|
|
timer := time.NewTimer(roleChangeTimeout)
|
|
defer timer.Stop()
|
|
defer waitRoleCancel()
|
|
|
|
select {
|
|
case <-timer.C:
|
|
case <-workerRole:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
if !wasRemoved {
|
|
log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
|
|
return nil
|
|
}
|
|
// We need to be extra careful about restarting the
|
|
// manager. It may cause the node to wrongly join under
|
|
// a new Raft ID. Since we didn't see a role change
|
|
// yet, force a certificate renewal. If the certificate
|
|
// comes back with a worker role, we know we shouldn't
|
|
// restart the manager. However, if we don't see
|
|
// workerRole get closed, it means we didn't switch to
|
|
// a worker certificate, either because we couldn't
|
|
// contact a working CA, or because we've been
|
|
// re-promoted. In this case, we must assume we were
|
|
// re-promoted, and restart the manager.
|
|
log.G(ctx).Warn("failed to get worker role after manager stop, forcing certificate renewal")
|
|
timer.Reset(roleChangeTimeout)
|
|
|
|
renewer.Renew()
|
|
|
|
// Now that the renewal request has been sent to the
|
|
// renewal goroutine, wait for a change in role.
|
|
select {
|
|
case <-timer.C:
|
|
log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
|
|
case <-workerRole:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set ready to nil after the first time we've gone through this, as we
|
|
// don't need to signal after the first time that the manager is ready.
|
|
ready = nil
|
|
}
|
|
}
|
|
|
|
// DowngradeKey reverts the node key to older format so that it can
|
|
// run on older version of swarmkit
|
|
func (n *Node) DowngradeKey() error {
|
|
paths := ca.NewConfigPaths(filepath.Join(n.config.StateDir, certDirectory))
|
|
krw := ca.NewKeyReadWriter(paths.Node, n.config.UnlockKey, nil)
|
|
|
|
return krw.DowngradeKey()
|
|
}
|
|
|
|
type persistentRemotes struct {
|
|
sync.RWMutex
|
|
c *sync.Cond
|
|
remotes.Remotes
|
|
storePath string
|
|
lastSavedState []api.Peer
|
|
}
|
|
|
|
func newPersistentRemotes(f string, peers ...api.Peer) *persistentRemotes {
|
|
pr := &persistentRemotes{
|
|
storePath: f,
|
|
Remotes: remotes.NewRemotes(peers...),
|
|
}
|
|
pr.c = sync.NewCond(pr.RLocker())
|
|
return pr
|
|
}
|
|
|
|
func (s *persistentRemotes) Observe(peer api.Peer, weight int) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.Remotes.Observe(peer, weight)
|
|
s.c.Broadcast()
|
|
if err := s.save(); err != nil {
|
|
logrus.Errorf("error writing cluster state file: %v", err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
func (s *persistentRemotes) Remove(peers ...api.Peer) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
s.Remotes.Remove(peers...)
|
|
if err := s.save(); err != nil {
|
|
logrus.Errorf("error writing cluster state file: %v", err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *persistentRemotes) save() error {
|
|
weights := s.Weights()
|
|
remotes := make([]api.Peer, 0, len(weights))
|
|
for r := range weights {
|
|
remotes = append(remotes, r)
|
|
}
|
|
sort.Sort(sortablePeers(remotes))
|
|
if reflect.DeepEqual(remotes, s.lastSavedState) {
|
|
return nil
|
|
}
|
|
dt, err := json.Marshal(remotes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.lastSavedState = remotes
|
|
return ioutils.AtomicWriteFile(s.storePath, dt, 0600)
|
|
}
|
|
|
|
// WaitSelect waits until at least one remote becomes available and then selects one.
|
|
func (s *persistentRemotes) WaitSelect(ctx context.Context) <-chan api.Peer {
|
|
c := make(chan api.Peer, 1)
|
|
s.RLock()
|
|
done := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.c.Broadcast()
|
|
case <-done:
|
|
}
|
|
}()
|
|
go func() {
|
|
defer s.RUnlock()
|
|
defer close(c)
|
|
defer close(done)
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
p, err := s.Select()
|
|
if err == nil {
|
|
c <- p
|
|
return
|
|
}
|
|
s.c.Wait()
|
|
}
|
|
}()
|
|
return c
|
|
}
|
|
|
|
// sortablePeers is a sort wrapper for []api.Peer
|
|
type sortablePeers []api.Peer
|
|
|
|
func (sp sortablePeers) Less(i, j int) bool { return sp[i].NodeID < sp[j].NodeID }
|
|
|
|
func (sp sortablePeers) Len() int { return len(sp) }
|
|
|
|
func (sp sortablePeers) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] }
|
|
|
|
// firstSessionErrorTracker is a utility that helps determine whether the agent should exit after
|
|
// a TLS failure on establishing the first session. This should only happen if a join address
|
|
// is specified. If establishing the first session succeeds, but later on some session fails
|
|
// because of a TLS error, we don't want to exit the agent because a previously successful
|
|
// session indicates that the TLS error may be a transient issue.
|
|
type firstSessionErrorTracker struct {
|
|
mu sync.Mutex
|
|
pastFirstSession bool
|
|
err error
|
|
}
|
|
|
|
func (fs *firstSessionErrorTracker) SessionEstablished() {
|
|
fs.mu.Lock()
|
|
fs.pastFirstSession = true
|
|
fs.mu.Unlock()
|
|
}
|
|
|
|
func (fs *firstSessionErrorTracker) SessionError(err error) {
|
|
fs.mu.Lock()
|
|
fs.err = err
|
|
fs.mu.Unlock()
|
|
}
|
|
|
|
// SessionClosed returns an error if we haven't yet established a session, and
|
|
// we get a gprc error as a result of an X509 failure.
|
|
func (fs *firstSessionErrorTracker) SessionClosed() error {
|
|
fs.mu.Lock()
|
|
defer fs.mu.Unlock()
|
|
|
|
// if we've successfully established at least 1 session, never return
|
|
// errors
|
|
if fs.pastFirstSession {
|
|
return nil
|
|
}
|
|
|
|
// get the GRPC status from the error, because we only care about GRPC
|
|
// errors
|
|
grpcStatus, ok := status.FromError(fs.err)
|
|
// if this isn't a GRPC error, it's not an error we return from this method
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// NOTE(dperny, cyli): grpc does not expose the error type, which means we have
|
|
// to string matching to figure out if it's an x509 error.
|
|
//
|
|
// The error we're looking for has "connection error:", then says
|
|
// "transport:" and finally has "x509:"
|
|
// specifically, the connection error description reads:
|
|
//
|
|
// transport: authentication handshake failed: x509: certificate signed by unknown authority
|
|
//
|
|
// This string matching has caused trouble in the past. specifically, at
|
|
// some point between grpc versions 1.3.0 and 1.7.5, the string we were
|
|
// matching changed from "transport: x509" to "transport: authentication
|
|
// handshake failed: x509", which was an issue because we were matching for
|
|
// string "transport: x509:".
|
|
//
|
|
// In GRPC >= 1.10.x, transient errors like TLS errors became hidden by the
|
|
// load balancing that GRPC does. In GRPC 1.11.x, they were exposed again
|
|
// (usually) in RPC calls, but the error string then became:
|
|
// rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: authentication handshake failed: x509: certificate signed by unknown authority"
|
|
//
|
|
// It also went from an Internal error to an Unavailable error. So we're just going
|
|
// to search for the string: "transport: authentication handshake failed: x509:" since
|
|
// we want to fail for ALL x509 failures, not just unknown authority errors.
|
|
|
|
if !strings.Contains(grpcStatus.Message(), "connection error") ||
|
|
!strings.Contains(grpcStatus.Message(), "transport: authentication handshake failed: x509:") {
|
|
return nil
|
|
}
|
|
return fs.err
|
|
}
|