Merge component 'engine' from git@github.com:moby/moby master
This commit is contained in:
@ -842,12 +842,11 @@ definitions:
|
||||
Volumes:
|
||||
description: "An object mapping mount point paths inside the container to empty objects."
|
||||
type: "object"
|
||||
properties:
|
||||
additionalProperties:
|
||||
type: "object"
|
||||
enum:
|
||||
- {}
|
||||
default: {}
|
||||
additionalProperties:
|
||||
type: "object"
|
||||
enum:
|
||||
- {}
|
||||
default: {}
|
||||
WorkingDir:
|
||||
description: "The working directory for commands to run in."
|
||||
type: "string"
|
||||
|
||||
@ -239,6 +239,9 @@ func (daemon *Daemon) setupSecretDir(c *container.Container) (setupErr error) {
|
||||
if err := os.Chown(fPath, rootIDs.UID+uid, rootIDs.GID+gid); err != nil {
|
||||
return errors.Wrap(err, "error setting ownership for secret")
|
||||
}
|
||||
if err := os.Chmod(fPath, s.File.Mode); err != nil {
|
||||
return errors.Wrap(err, "error setting file mode for secret")
|
||||
}
|
||||
}
|
||||
|
||||
label.Relabel(localMountPath, c.MountLabel, false)
|
||||
@ -320,6 +323,9 @@ func (daemon *Daemon) setupConfigDir(c *container.Container) (setupErr error) {
|
||||
if err := os.Chown(fPath, rootIDs.UID+uid, rootIDs.GID+gid); err != nil {
|
||||
return errors.Wrap(err, "error setting ownership for config")
|
||||
}
|
||||
if err := os.Chmod(fPath, configRef.File.Mode); err != nil {
|
||||
return errors.Wrap(err, "error setting file mode for config")
|
||||
}
|
||||
|
||||
label.Relabel(fPath, c.MountLabel, false)
|
||||
}
|
||||
|
||||
@ -208,7 +208,7 @@ func (svm *serviceVM) hotAddVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error
|
||||
}
|
||||
|
||||
if err := svm.config.HotAddVhd(mvd.HostPath, mvd.ContainerPath, mvd.ReadOnly, !mvd.AttachOnly); err != nil {
|
||||
svm.hotRemoveVHDsAtStart(mvds[:i]...)
|
||||
svm.hotRemoveVHDsNoLock(mvds[:i]...)
|
||||
return err
|
||||
}
|
||||
svm.attachedVHDs[mvd.HostPath] = 1
|
||||
@ -217,17 +217,19 @@ func (svm *serviceVM) hotAddVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error
|
||||
}
|
||||
|
||||
// hotRemoveVHDs waits for the service vm to start and then removes the vhds.
|
||||
// The service VM must not be locked when calling this function.
|
||||
func (svm *serviceVM) hotRemoveVHDs(mvds ...hcsshim.MappedVirtualDisk) error {
|
||||
if err := svm.getStartError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return svm.hotRemoveVHDsAtStart(mvds...)
|
||||
}
|
||||
|
||||
// hotRemoveVHDsAtStart works the same way as hotRemoveVHDs but does not wait for the VM to start.
|
||||
func (svm *serviceVM) hotRemoveVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error {
|
||||
svm.Lock()
|
||||
defer svm.Unlock()
|
||||
return svm.hotRemoveVHDsNoLock(mvds...)
|
||||
}
|
||||
|
||||
// hotRemoveVHDsNoLock removes VHDs from a service VM. When calling this function,
|
||||
// the contract is the service VM lock must be held.
|
||||
func (svm *serviceVM) hotRemoveVHDsNoLock(mvds ...hcsshim.MappedVirtualDisk) error {
|
||||
var retErr error
|
||||
for _, mvd := range mvds {
|
||||
if _, ok := svm.attachedVHDs[mvd.HostPath]; !ok {
|
||||
|
||||
@ -862,14 +862,10 @@ func (daemon *Daemon) createSpec(c *container.Container) (*specs.Spec, error) {
|
||||
|
||||
for _, ns := range s.Linux.Namespaces {
|
||||
if ns.Type == "network" && ns.Path == "" && !c.Config.NetworkDisabled {
|
||||
target, err := os.Readlink(filepath.Join("/proc", strconv.Itoa(os.Getpid()), "exe"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
target := filepath.Join("/proc", strconv.Itoa(os.Getpid()), "exe")
|
||||
s.Hooks = &specs.Hooks{
|
||||
Prestart: []specs.Hook{{
|
||||
Path: target, // FIXME: cross-platform
|
||||
Path: target,
|
||||
Args: []string{"libnetwork-setkey", c.ID, daemon.netController.ID()},
|
||||
}},
|
||||
}
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/docker/docker/integration-cli/checker"
|
||||
"github.com/go-check/check"
|
||||
)
|
||||
|
||||
func (s *DockerSuite) TestStopContainerWithRestartPolicyAlways(c *check.C) {
|
||||
dockerCmd(c, "run", "--name", "verifyRestart1", "-d", "--restart=always", "busybox", "false")
|
||||
dockerCmd(c, "run", "--name", "verifyRestart2", "-d", "--restart=always", "busybox", "false")
|
||||
|
||||
c.Assert(waitRun("verifyRestart1"), checker.IsNil)
|
||||
c.Assert(waitRun("verifyRestart2"), checker.IsNil)
|
||||
|
||||
dockerCmd(c, "stop", "verifyRestart1")
|
||||
dockerCmd(c, "stop", "verifyRestart2")
|
||||
}
|
||||
@ -18,6 +18,46 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestStopContainerWithRestartPolicyAlways(t *testing.T) {
|
||||
defer setupTest(t)()
|
||||
client := request.NewAPIClient(t)
|
||||
ctx := context.Background()
|
||||
|
||||
names := []string{"verifyRestart1", "verifyRestart2"}
|
||||
for _, name := range names {
|
||||
resp, err := client.ContainerCreate(ctx,
|
||||
&container.Config{
|
||||
Cmd: []string{"false"},
|
||||
Image: "busybox",
|
||||
},
|
||||
&container.HostConfig{
|
||||
RestartPolicy: container.RestartPolicy{
|
||||
Name: "always",
|
||||
},
|
||||
},
|
||||
&network.NetworkingConfig{},
|
||||
name,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
poll.WaitOn(t, containerIsInState(ctx, client, name, "running", "restarting"), poll.WithDelay(100*time.Millisecond))
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
err := client.ContainerStop(ctx, name, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
poll.WaitOn(t, containerIsStopped(ctx, client, name), poll.WithDelay(100*time.Millisecond))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteDevicemapper(t *testing.T) {
|
||||
skip.IfCondition(t, testEnv.DaemonInfo.Driver != "devicemapper")
|
||||
|
||||
@ -72,3 +112,18 @@ func containerIsStopped(ctx context.Context, client client.APIClient, containerI
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func containerIsInState(ctx context.Context, client client.APIClient, containerID string, state ...string) func(log poll.LogT) poll.Result {
|
||||
return func(log poll.LogT) poll.Result {
|
||||
inspect, err := client.ContainerInspect(ctx, containerID)
|
||||
if err != nil {
|
||||
return poll.Error(err)
|
||||
}
|
||||
for _, v := range state {
|
||||
if inspect.State.Status == v {
|
||||
return poll.Success()
|
||||
}
|
||||
}
|
||||
return poll.Continue("waiting for container to be running, currently %s", inspect.State.Status)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
@ -144,6 +145,168 @@ func TestCreateWithDuplicateNetworkNames(t *testing.T) {
|
||||
poll.WaitOn(t, networkIsRemoved(client, n1.ID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
|
||||
}
|
||||
|
||||
func TestCreateServiceSecretFileMode(t *testing.T) {
|
||||
defer setupTest(t)()
|
||||
d := newSwarm(t)
|
||||
defer d.Stop(t)
|
||||
client, err := request.NewClientForHost(d.Sock())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
secretResp, err := client.SecretCreate(ctx, swarm.SecretSpec{
|
||||
Annotations: swarm.Annotations{
|
||||
Name: "TestSecret",
|
||||
},
|
||||
Data: []byte("TESTSECRET"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
var instances uint64 = 1
|
||||
serviceSpec := swarm.ServiceSpec{
|
||||
Annotations: swarm.Annotations{
|
||||
Name: "TestService",
|
||||
},
|
||||
TaskTemplate: swarm.TaskSpec{
|
||||
ContainerSpec: &swarm.ContainerSpec{
|
||||
Image: "busybox:latest",
|
||||
Command: []string{"/bin/sh", "-c", "ls -l /etc/secret || /bin/top"},
|
||||
Secrets: []*swarm.SecretReference{
|
||||
{
|
||||
File: &swarm.SecretReferenceFileTarget{
|
||||
Name: "/etc/secret",
|
||||
UID: "0",
|
||||
GID: "0",
|
||||
Mode: 0777,
|
||||
},
|
||||
SecretID: secretResp.ID,
|
||||
SecretName: "TestSecret",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &instances,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
serviceResp, err := client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{
|
||||
QueryRegistry: false,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
poll.WaitOn(t, serviceRunningTasksCount(client, serviceResp.ID, instances))
|
||||
|
||||
filter := filters.NewArgs()
|
||||
filter.Add("service", serviceResp.ID)
|
||||
tasks, err := client.TaskList(ctx, types.TaskListOptions{
|
||||
Filters: filter,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, len(tasks), 1)
|
||||
|
||||
body, err := client.ContainerLogs(ctx, tasks[0].Status.ContainerStatus.ContainerID, types.ContainerLogsOptions{
|
||||
ShowStdout: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer body.Close()
|
||||
|
||||
content, err := ioutil.ReadAll(body)
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, string(content), "-rwxrwxrwx")
|
||||
|
||||
err = client.ServiceRemove(ctx, serviceResp.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
poll.WaitOn(t, serviceIsRemoved(client, serviceResp.ID))
|
||||
poll.WaitOn(t, noTasks(client))
|
||||
|
||||
err = client.SecretRemove(ctx, "TestSecret")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestCreateServiceConfigFileMode(t *testing.T) {
|
||||
defer setupTest(t)()
|
||||
d := newSwarm(t)
|
||||
defer d.Stop(t)
|
||||
client, err := request.NewClientForHost(d.Sock())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
configResp, err := client.ConfigCreate(ctx, swarm.ConfigSpec{
|
||||
Annotations: swarm.Annotations{
|
||||
Name: "TestConfig",
|
||||
},
|
||||
Data: []byte("TESTCONFIG"),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
var instances uint64 = 1
|
||||
serviceSpec := swarm.ServiceSpec{
|
||||
Annotations: swarm.Annotations{
|
||||
Name: "TestService",
|
||||
},
|
||||
TaskTemplate: swarm.TaskSpec{
|
||||
ContainerSpec: &swarm.ContainerSpec{
|
||||
Image: "busybox:latest",
|
||||
Command: []string{"/bin/sh", "-c", "ls -l /etc/config || /bin/top"},
|
||||
Configs: []*swarm.ConfigReference{
|
||||
{
|
||||
File: &swarm.ConfigReferenceFileTarget{
|
||||
Name: "/etc/config",
|
||||
UID: "0",
|
||||
GID: "0",
|
||||
Mode: 0777,
|
||||
},
|
||||
ConfigID: configResp.ID,
|
||||
ConfigName: "TestConfig",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Mode: swarm.ServiceMode{
|
||||
Replicated: &swarm.ReplicatedService{
|
||||
Replicas: &instances,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
serviceResp, err := client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{
|
||||
QueryRegistry: false,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
poll.WaitOn(t, serviceRunningTasksCount(client, serviceResp.ID, instances))
|
||||
|
||||
filter := filters.NewArgs()
|
||||
filter.Add("service", serviceResp.ID)
|
||||
tasks, err := client.TaskList(ctx, types.TaskListOptions{
|
||||
Filters: filter,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, len(tasks), 1)
|
||||
|
||||
body, err := client.ContainerLogs(ctx, tasks[0].Status.ContainerStatus.ContainerID, types.ContainerLogsOptions{
|
||||
ShowStdout: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer body.Close()
|
||||
|
||||
content, err := ioutil.ReadAll(body)
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, string(content), "-rwxrwxrwx")
|
||||
|
||||
err = client.ServiceRemove(ctx, serviceResp.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
poll.WaitOn(t, serviceIsRemoved(client, serviceResp.ID))
|
||||
poll.WaitOn(t, noTasks(client))
|
||||
|
||||
err = client.ConfigRemove(ctx, "TestConfig")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func swarmServiceSpec(name string, replicas uint64) swarm.ServiceSpec {
|
||||
return swarm.ServiceSpec{
|
||||
Annotations: swarm.Annotations{
|
||||
|
||||
@ -115,7 +115,7 @@ github.com/dmcgowan/go-tar go1.10
|
||||
github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f
|
||||
|
||||
# cluster
|
||||
github.com/docker/swarmkit 713d79dc8799b33465c58ed120b870c52eb5eb4f
|
||||
github.com/docker/swarmkit 68a376dc30d8c4001767c39456b990dbd821371b
|
||||
github.com/gogo/protobuf v0.4
|
||||
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
|
||||
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e
|
||||
|
||||
2
components/engine/vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
2
components/engine/vendor/github.com/docker/swarmkit/agent/agent.go
generated
vendored
@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) {
|
||||
a.config.SessionTracker.SessionError(err)
|
||||
}
|
||||
|
||||
log.G(ctx).WithError(err).Error("agent: session failed")
|
||||
backoff = initialSessionFailureBackoff + 2*backoff
|
||||
if backoff > maxSessionFailureBackoff {
|
||||
backoff = maxSessionFailureBackoff
|
||||
}
|
||||
log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed")
|
||||
}
|
||||
|
||||
if err := session.close(); err != nil {
|
||||
|
||||
2
components/engine/vendor/github.com/docker/swarmkit/agent/errors.go
generated
vendored
2
components/engine/vendor/github.com/docker/swarmkit/agent/errors.go
generated
vendored
@ -13,7 +13,7 @@ var (
|
||||
errAgentStarted = errors.New("agent: already started")
|
||||
errAgentNotStarted = errors.New("agent: not started")
|
||||
|
||||
errTaskNoContoller = errors.New("agent: no task controller")
|
||||
errTaskNoController = errors.New("agent: no task controller")
|
||||
errTaskNotAssigned = errors.New("agent: task not assigned")
|
||||
errTaskStatusUpdateNoChange = errors.New("agent: no change in task status")
|
||||
errTaskUnknown = errors.New("agent: task unknown")
|
||||
|
||||
17
components/engine/vendor/github.com/docker/swarmkit/agent/session.go
generated
vendored
17
components/engine/vendor/github.com/docker/swarmkit/agent/session.go
generated
vendored
@ -65,10 +65,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
|
||||
grpc.WithTransportCredentials(agent.config.Credentials),
|
||||
grpc.WithTimeout(dispatcherRPCTimeout),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
s.errs <- err
|
||||
return s
|
||||
}
|
||||
|
||||
log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer())
|
||||
|
||||
s.conn = cc
|
||||
|
||||
go s.run(sessionCtx, delay, description)
|
||||
@ -77,6 +81,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI
|
||||
|
||||
func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
|
||||
timer := time.NewTimer(delay) // delay before registering.
|
||||
log.G(ctx).Infof("waiting %v before registering session", delay)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
@ -166,15 +171,23 @@ func (s *session) heartbeat(ctx context.Context) error {
|
||||
heartbeat := time.NewTimer(1) // send out a heartbeat right away
|
||||
defer heartbeat.Stop()
|
||||
|
||||
fields := logrus.Fields{
|
||||
"sessionID": s.sessionID,
|
||||
"method": "(*session).heartbeat",
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-heartbeat.C:
|
||||
heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
|
||||
// TODO(anshul) log manager info in all logs in this function.
|
||||
log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout)
|
||||
resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
|
||||
SessionID: s.sessionID,
|
||||
})
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer())
|
||||
if grpc.Code(err) == codes.NotFound {
|
||||
err = errNodeNotRegistered
|
||||
}
|
||||
@ -182,6 +195,8 @@ func (s *session) heartbeat(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period)
|
||||
|
||||
heartbeat.Reset(resp.Period)
|
||||
case <-s.closed:
|
||||
return errSessionClosed
|
||||
@ -408,7 +423,7 @@ func (s *session) sendError(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// close closing session. It should be called only in <-session.errs branch
|
||||
// close the given session. It should be called only in <-session.errs branch
|
||||
// of event loop, or when cleaning up the agent.
|
||||
func (s *session) close() error {
|
||||
s.closeOnce.Do(func() {
|
||||
|
||||
6
components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go
generated
vendored
6
components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go
generated
vendored
@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
|
||||
// connection.
|
||||
func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
|
||||
peer, err := b.remotes.Select()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -98,6 +99,11 @@ type Conn struct {
|
||||
peer api.Peer
|
||||
}
|
||||
|
||||
// Peer returns the peer for this Conn.
|
||||
func (c *Conn) Peer() api.Peer {
|
||||
return c.peer
|
||||
}
|
||||
|
||||
// Close closes the client connection if it is a remote connection. It also
|
||||
// records a positive experience with the remote peer if success is true,
|
||||
// otherwise it records a negative experience. If a local connection is in use,
|
||||
|
||||
32
components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go
generated
vendored
32
components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go
generated
vendored
@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer {
|
||||
// Run runs dispatcher tasks which should be run on leader dispatcher.
|
||||
// Dispatcher can be stopped with cancelling ctx or calling Stop().
|
||||
func (d *Dispatcher) Run(ctx context.Context) error {
|
||||
ctx = log.WithModule(ctx, "dispatcher")
|
||||
log.G(ctx).Info("dispatcher starting")
|
||||
|
||||
d.taskUpdatesLock.Lock()
|
||||
d.taskUpdates = make(map[string]*api.TaskStatus)
|
||||
d.taskUpdatesLock.Unlock()
|
||||
@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error {
|
||||
d.mu.Unlock()
|
||||
return errors.New("dispatcher is already running")
|
||||
}
|
||||
ctx = log.WithModule(ctx, "dispatcher")
|
||||
if err := d.markNodesUnknown(ctx); err != nil {
|
||||
log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err)
|
||||
}
|
||||
@ -310,8 +312,12 @@ func (d *Dispatcher) Stop() error {
|
||||
d.mu.Unlock()
|
||||
return errors.New("dispatcher is already stopped")
|
||||
}
|
||||
|
||||
log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
|
||||
log.Info("dispatcher stopping")
|
||||
d.cancel()
|
||||
d.mu.Unlock()
|
||||
|
||||
d.nodes.Clean()
|
||||
|
||||
d.processUpdatesLock.Lock()
|
||||
@ -361,13 +367,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
||||
if node.Status.State == api.NodeStatus_DOWN {
|
||||
nodeCopy := node
|
||||
expireFunc := func() {
|
||||
log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID)
|
||||
if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil {
|
||||
log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
||||
log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID)
|
||||
}
|
||||
|
||||
d.downNodes.Delete(nodeCopy.ID)
|
||||
}
|
||||
|
||||
log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID)
|
||||
d.downNodes.Add(nodeCopy, expireFunc)
|
||||
return nil
|
||||
}
|
||||
@ -379,16 +387,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error {
|
||||
|
||||
expireFunc := func() {
|
||||
log := log.WithField("node", nodeID)
|
||||
log.Debug("heartbeat expiration for unknown node")
|
||||
log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID)
|
||||
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil {
|
||||
log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`)
|
||||
}
|
||||
}
|
||||
if err := d.nodes.AddUnknown(node, expireFunc); err != nil {
|
||||
return errors.Wrap(err, `adding node in "unknown" state to node store failed`)
|
||||
return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID)
|
||||
}
|
||||
if err := store.UpdateNode(tx, node); err != nil {
|
||||
return errors.Wrap(err, "update failed")
|
||||
return errors.Wrapf(err, "update for node %s failed", nodeID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -470,6 +478,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
|
||||
|
||||
// register is used for registration of node with particular dispatcher.
|
||||
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
|
||||
logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
|
||||
// prevent register until we're ready to accept it
|
||||
dctx, err := d.isRunningLocked()
|
||||
if err != nil {
|
||||
@ -491,7 +500,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
||||
|
||||
addr, err := nodeIPFromContext(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Debug("failed to get remote node IP")
|
||||
logLocal.WithError(err).Debug("failed to get remote node IP")
|
||||
}
|
||||
|
||||
if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil {
|
||||
@ -499,13 +508,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a
|
||||
}
|
||||
|
||||
expireFunc := func() {
|
||||
log.G(ctx).Debug("heartbeat expiration")
|
||||
log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID)
|
||||
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration")
|
||||
}
|
||||
}
|
||||
|
||||
rn := d.nodes.Add(node, expireFunc)
|
||||
logLocal.Infof("worker %s was successfully registered", nodeID)
|
||||
|
||||
// NOTE(stevvooe): We need be a little careful with re-registration. The
|
||||
// current implementation just matches the node id and then gives away the
|
||||
@ -1029,6 +1039,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
|
||||
|
||||
// markNodeNotReady sets the node state to some state other than READY
|
||||
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
|
||||
logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")
|
||||
|
||||
dctx, err := d.isRunningLocked()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1048,6 +1060,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
||||
}
|
||||
|
||||
expireFunc := func() {
|
||||
log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id)
|
||||
if err := d.moveTasksToOrphaned(id); err != nil {
|
||||
log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`)
|
||||
}
|
||||
@ -1056,6 +1069,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
||||
}
|
||||
|
||||
d.downNodes.Add(node, expireFunc)
|
||||
logLocal.Debugf("added node %s to down nodes list", node.ID)
|
||||
|
||||
status := &api.NodeStatus{
|
||||
State: state,
|
||||
@ -1080,6 +1094,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes
|
||||
if rn := d.nodes.Delete(id); rn == nil {
|
||||
return errors.Errorf("node %s is not found in local storage", id)
|
||||
}
|
||||
logLocal.Debugf("deleted node %s from node store", node.ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1094,6 +1109,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a
|
||||
}
|
||||
|
||||
period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID)
|
||||
|
||||
log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period)
|
||||
return &api.HeartbeatResponse{Period: period}, err
|
||||
}
|
||||
|
||||
@ -1206,6 +1223,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("dispatcher session dropped, marking node %s down", nodeID)
|
||||
if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil {
|
||||
log.WithError(err).Error("failed to remove node")
|
||||
}
|
||||
|
||||
2
components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
2
components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go
generated
vendored
@ -33,7 +33,7 @@ type logMessage struct {
|
||||
// LogBroker coordinates log subscriptions to services and tasks. Clients can
|
||||
// publish and subscribe to logs channels.
|
||||
//
|
||||
// Log subscriptions are pushed to the work nodes by creating log subscsription
|
||||
// Log subscriptions are pushed to the work nodes by creating log subscription
|
||||
// tasks. As such, the LogBroker also acts as an orchestrator of these tasks.
|
||||
type LogBroker struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
@ -239,6 +239,7 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
|
||||
|
||||
// Try doing a regular rpc if the receiver doesn't support streaming.
|
||||
if grpc.Code(err) == codes.Unimplemented {
|
||||
log.G(ctx).Info("sending message to raft peer using ProcessRaftMessage()")
|
||||
_, err = api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user