diff --git a/components/engine/api/swagger.yaml b/components/engine/api/swagger.yaml index 46136fddb7..cc786e02e5 100644 --- a/components/engine/api/swagger.yaml +++ b/components/engine/api/swagger.yaml @@ -842,12 +842,11 @@ definitions: Volumes: description: "An object mapping mount point paths inside the container to empty objects." type: "object" - properties: - additionalProperties: - type: "object" - enum: - - {} - default: {} + additionalProperties: + type: "object" + enum: + - {} + default: {} WorkingDir: description: "The working directory for commands to run in." type: "string" diff --git a/components/engine/daemon/container_operations_unix.go b/components/engine/daemon/container_operations_unix.go index a123df3a9a..0aaa6c2253 100644 --- a/components/engine/daemon/container_operations_unix.go +++ b/components/engine/daemon/container_operations_unix.go @@ -239,6 +239,9 @@ func (daemon *Daemon) setupSecretDir(c *container.Container) (setupErr error) { if err := os.Chown(fPath, rootIDs.UID+uid, rootIDs.GID+gid); err != nil { return errors.Wrap(err, "error setting ownership for secret") } + if err := os.Chmod(fPath, s.File.Mode); err != nil { + return errors.Wrap(err, "error setting file mode for secret") + } } label.Relabel(localMountPath, c.MountLabel, false) @@ -320,6 +323,9 @@ func (daemon *Daemon) setupConfigDir(c *container.Container) (setupErr error) { if err := os.Chown(fPath, rootIDs.UID+uid, rootIDs.GID+gid); err != nil { return errors.Wrap(err, "error setting ownership for config") } + if err := os.Chmod(fPath, configRef.File.Mode); err != nil { + return errors.Wrap(err, "error setting file mode for config") + } label.Relabel(fPath, c.MountLabel, false) } diff --git a/components/engine/daemon/graphdriver/lcow/lcow_svm.go b/components/engine/daemon/graphdriver/lcow/lcow_svm.go index d4a42df334..87d2eed8f4 100644 --- a/components/engine/daemon/graphdriver/lcow/lcow_svm.go +++ b/components/engine/daemon/graphdriver/lcow/lcow_svm.go @@ -208,7 +208,7 @@ func (svm *serviceVM) hotAddVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error } if err := svm.config.HotAddVhd(mvd.HostPath, mvd.ContainerPath, mvd.ReadOnly, !mvd.AttachOnly); err != nil { - svm.hotRemoveVHDsAtStart(mvds[:i]...) + svm.hotRemoveVHDsNoLock(mvds[:i]...) return err } svm.attachedVHDs[mvd.HostPath] = 1 @@ -217,17 +217,19 @@ func (svm *serviceVM) hotAddVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error } // hotRemoveVHDs waits for the service vm to start and then removes the vhds. +// The service VM must not be locked when calling this function. func (svm *serviceVM) hotRemoveVHDs(mvds ...hcsshim.MappedVirtualDisk) error { if err := svm.getStartError(); err != nil { return err } - return svm.hotRemoveVHDsAtStart(mvds...) -} - -// hotRemoveVHDsAtStart works the same way as hotRemoveVHDs but does not wait for the VM to start. -func (svm *serviceVM) hotRemoveVHDsAtStart(mvds ...hcsshim.MappedVirtualDisk) error { svm.Lock() defer svm.Unlock() + return svm.hotRemoveVHDsNoLock(mvds...) +} + +// hotRemoveVHDsNoLock removes VHDs from a service VM. When calling this function, +// the contract is the service VM lock must be held. +func (svm *serviceVM) hotRemoveVHDsNoLock(mvds ...hcsshim.MappedVirtualDisk) error { var retErr error for _, mvd := range mvds { if _, ok := svm.attachedVHDs[mvd.HostPath]; !ok { diff --git a/components/engine/daemon/oci_linux.go b/components/engine/daemon/oci_linux.go index 53dc276565..dbc26e8efe 100644 --- a/components/engine/daemon/oci_linux.go +++ b/components/engine/daemon/oci_linux.go @@ -862,14 +862,10 @@ func (daemon *Daemon) createSpec(c *container.Container) (*specs.Spec, error) { for _, ns := range s.Linux.Namespaces { if ns.Type == "network" && ns.Path == "" && !c.Config.NetworkDisabled { - target, err := os.Readlink(filepath.Join("/proc", strconv.Itoa(os.Getpid()), "exe")) - if err != nil { - return nil, err - } - + target := filepath.Join("/proc", strconv.Itoa(os.Getpid()), "exe") s.Hooks = &specs.Hooks{ Prestart: []specs.Hook{{ - Path: target, // FIXME: cross-platform + Path: target, Args: []string{"libnetwork-setkey", c.ID, daemon.netController.ID()}, }}, } diff --git a/components/engine/integration-cli/docker_cli_stop_test.go b/components/engine/integration-cli/docker_cli_stop_test.go deleted file mode 100644 index 1be41203b3..0000000000 --- a/components/engine/integration-cli/docker_cli_stop_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package main - -import ( - "github.com/docker/docker/integration-cli/checker" - "github.com/go-check/check" -) - -func (s *DockerSuite) TestStopContainerWithRestartPolicyAlways(c *check.C) { - dockerCmd(c, "run", "--name", "verifyRestart1", "-d", "--restart=always", "busybox", "false") - dockerCmd(c, "run", "--name", "verifyRestart2", "-d", "--restart=always", "busybox", "false") - - c.Assert(waitRun("verifyRestart1"), checker.IsNil) - c.Assert(waitRun("verifyRestart2"), checker.IsNil) - - dockerCmd(c, "stop", "verifyRestart1") - dockerCmd(c, "stop", "verifyRestart2") -} diff --git a/components/engine/integration/container/stop_test.go b/components/engine/integration/container/stop_test.go index feecc6901f..dadf768bb7 100644 --- a/components/engine/integration/container/stop_test.go +++ b/components/engine/integration/container/stop_test.go @@ -18,6 +18,46 @@ import ( "github.com/stretchr/testify/require" ) +func TestStopContainerWithRestartPolicyAlways(t *testing.T) { + defer setupTest(t)() + client := request.NewAPIClient(t) + ctx := context.Background() + + names := []string{"verifyRestart1", "verifyRestart2"} + for _, name := range names { + resp, err := client.ContainerCreate(ctx, + &container.Config{ + Cmd: []string{"false"}, + Image: "busybox", + }, + &container.HostConfig{ + RestartPolicy: container.RestartPolicy{ + Name: "always", + }, + }, + &network.NetworkingConfig{}, + name, + ) + require.NoError(t, err) + + err = client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}) + require.NoError(t, err) + } + + for _, name := range names { + poll.WaitOn(t, containerIsInState(ctx, client, name, "running", "restarting"), poll.WithDelay(100*time.Millisecond)) + } + + for _, name := range names { + err := client.ContainerStop(ctx, name, nil) + require.NoError(t, err) + } + + for _, name := range names { + poll.WaitOn(t, containerIsStopped(ctx, client, name), poll.WithDelay(100*time.Millisecond)) + } +} + func TestDeleteDevicemapper(t *testing.T) { skip.IfCondition(t, testEnv.DaemonInfo.Driver != "devicemapper") @@ -72,3 +112,18 @@ func containerIsStopped(ctx context.Context, client client.APIClient, containerI } } } + +func containerIsInState(ctx context.Context, client client.APIClient, containerID string, state ...string) func(log poll.LogT) poll.Result { + return func(log poll.LogT) poll.Result { + inspect, err := client.ContainerInspect(ctx, containerID) + if err != nil { + return poll.Error(err) + } + for _, v := range state { + if inspect.State.Status == v { + return poll.Success() + } + } + return poll.Continue("waiting for container to be running, currently %s", inspect.State.Status) + } +} diff --git a/components/engine/integration/service/create_test.go b/components/engine/integration/service/create_test.go index 7e78b94992..fb8ea32b43 100644 --- a/components/engine/integration/service/create_test.go +++ b/components/engine/integration/service/create_test.go @@ -1,6 +1,7 @@ package service import ( + "io/ioutil" "runtime" "testing" "time" @@ -144,6 +145,168 @@ func TestCreateWithDuplicateNetworkNames(t *testing.T) { poll.WaitOn(t, networkIsRemoved(client, n1.ID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second)) } +func TestCreateServiceSecretFileMode(t *testing.T) { + defer setupTest(t)() + d := newSwarm(t) + defer d.Stop(t) + client, err := request.NewClientForHost(d.Sock()) + require.NoError(t, err) + + ctx := context.Background() + secretResp, err := client.SecretCreate(ctx, swarm.SecretSpec{ + Annotations: swarm.Annotations{ + Name: "TestSecret", + }, + Data: []byte("TESTSECRET"), + }) + require.NoError(t, err) + + var instances uint64 = 1 + serviceSpec := swarm.ServiceSpec{ + Annotations: swarm.Annotations{ + Name: "TestService", + }, + TaskTemplate: swarm.TaskSpec{ + ContainerSpec: &swarm.ContainerSpec{ + Image: "busybox:latest", + Command: []string{"/bin/sh", "-c", "ls -l /etc/secret || /bin/top"}, + Secrets: []*swarm.SecretReference{ + { + File: &swarm.SecretReferenceFileTarget{ + Name: "/etc/secret", + UID: "0", + GID: "0", + Mode: 0777, + }, + SecretID: secretResp.ID, + SecretName: "TestSecret", + }, + }, + }, + }, + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &instances, + }, + }, + } + + serviceResp, err := client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{ + QueryRegistry: false, + }) + require.NoError(t, err) + + poll.WaitOn(t, serviceRunningTasksCount(client, serviceResp.ID, instances)) + + filter := filters.NewArgs() + filter.Add("service", serviceResp.ID) + tasks, err := client.TaskList(ctx, types.TaskListOptions{ + Filters: filter, + }) + require.NoError(t, err) + assert.Equal(t, len(tasks), 1) + + body, err := client.ContainerLogs(ctx, tasks[0].Status.ContainerStatus.ContainerID, types.ContainerLogsOptions{ + ShowStdout: true, + }) + require.NoError(t, err) + defer body.Close() + + content, err := ioutil.ReadAll(body) + require.NoError(t, err) + assert.Contains(t, string(content), "-rwxrwxrwx") + + err = client.ServiceRemove(ctx, serviceResp.ID) + require.NoError(t, err) + + poll.WaitOn(t, serviceIsRemoved(client, serviceResp.ID)) + poll.WaitOn(t, noTasks(client)) + + err = client.SecretRemove(ctx, "TestSecret") + require.NoError(t, err) +} + +func TestCreateServiceConfigFileMode(t *testing.T) { + defer setupTest(t)() + d := newSwarm(t) + defer d.Stop(t) + client, err := request.NewClientForHost(d.Sock()) + require.NoError(t, err) + + ctx := context.Background() + configResp, err := client.ConfigCreate(ctx, swarm.ConfigSpec{ + Annotations: swarm.Annotations{ + Name: "TestConfig", + }, + Data: []byte("TESTCONFIG"), + }) + require.NoError(t, err) + + var instances uint64 = 1 + serviceSpec := swarm.ServiceSpec{ + Annotations: swarm.Annotations{ + Name: "TestService", + }, + TaskTemplate: swarm.TaskSpec{ + ContainerSpec: &swarm.ContainerSpec{ + Image: "busybox:latest", + Command: []string{"/bin/sh", "-c", "ls -l /etc/config || /bin/top"}, + Configs: []*swarm.ConfigReference{ + { + File: &swarm.ConfigReferenceFileTarget{ + Name: "/etc/config", + UID: "0", + GID: "0", + Mode: 0777, + }, + ConfigID: configResp.ID, + ConfigName: "TestConfig", + }, + }, + }, + }, + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &instances, + }, + }, + } + + serviceResp, err := client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{ + QueryRegistry: false, + }) + require.NoError(t, err) + + poll.WaitOn(t, serviceRunningTasksCount(client, serviceResp.ID, instances)) + + filter := filters.NewArgs() + filter.Add("service", serviceResp.ID) + tasks, err := client.TaskList(ctx, types.TaskListOptions{ + Filters: filter, + }) + require.NoError(t, err) + assert.Equal(t, len(tasks), 1) + + body, err := client.ContainerLogs(ctx, tasks[0].Status.ContainerStatus.ContainerID, types.ContainerLogsOptions{ + ShowStdout: true, + }) + require.NoError(t, err) + defer body.Close() + + content, err := ioutil.ReadAll(body) + require.NoError(t, err) + assert.Contains(t, string(content), "-rwxrwxrwx") + + err = client.ServiceRemove(ctx, serviceResp.ID) + require.NoError(t, err) + + poll.WaitOn(t, serviceIsRemoved(client, serviceResp.ID)) + poll.WaitOn(t, noTasks(client)) + + err = client.ConfigRemove(ctx, "TestConfig") + require.NoError(t, err) +} + func swarmServiceSpec(name string, replicas uint64) swarm.ServiceSpec { return swarm.ServiceSpec{ Annotations: swarm.Annotations{ diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index 5ae8a0393e..dc06f6a6fb 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -115,7 +115,7 @@ github.com/dmcgowan/go-tar go1.10 github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f # cluster -github.com/docker/swarmkit 713d79dc8799b33465c58ed120b870c52eb5eb4f +github.com/docker/swarmkit 68a376dc30d8c4001767c39456b990dbd821371b github.com/gogo/protobuf v0.4 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e diff --git a/components/engine/vendor/github.com/docker/swarmkit/agent/agent.go b/components/engine/vendor/github.com/docker/swarmkit/agent/agent.go index dc72fa0407..d28be94385 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/agent/agent.go +++ b/components/engine/vendor/github.com/docker/swarmkit/agent/agent.go @@ -333,11 +333,11 @@ func (a *Agent) run(ctx context.Context) { a.config.SessionTracker.SessionError(err) } - log.G(ctx).WithError(err).Error("agent: session failed") backoff = initialSessionFailureBackoff + 2*backoff if backoff > maxSessionFailureBackoff { backoff = maxSessionFailureBackoff } + log.G(ctx).WithError(err).WithField("backoff", backoff).Errorf("agent: session failed") } if err := session.close(); err != nil { diff --git a/components/engine/vendor/github.com/docker/swarmkit/agent/errors.go b/components/engine/vendor/github.com/docker/swarmkit/agent/errors.go index c45f99b1ea..29f8ff1c9f 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/agent/errors.go +++ b/components/engine/vendor/github.com/docker/swarmkit/agent/errors.go @@ -13,7 +13,7 @@ var ( errAgentStarted = errors.New("agent: already started") errAgentNotStarted = errors.New("agent: not started") - errTaskNoContoller = errors.New("agent: no task controller") + errTaskNoController = errors.New("agent: no task controller") errTaskNotAssigned = errors.New("agent: task not assigned") errTaskStatusUpdateNoChange = errors.New("agent: no change in task status") errTaskUnknown = errors.New("agent: task unknown") diff --git a/components/engine/vendor/github.com/docker/swarmkit/agent/session.go b/components/engine/vendor/github.com/docker/swarmkit/agent/session.go index 0c00c4f1bf..9bb9773a6c 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/agent/session.go +++ b/components/engine/vendor/github.com/docker/swarmkit/agent/session.go @@ -65,10 +65,14 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI grpc.WithTransportCredentials(agent.config.Credentials), grpc.WithTimeout(dispatcherRPCTimeout), ) + if err != nil { s.errs <- err return s } + + log.G(ctx).Infof("manager selected by agent for new session: %v", cc.Peer()) + s.conn = cc go s.run(sessionCtx, delay, description) @@ -77,6 +81,7 @@ func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionI func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) { timer := time.NewTimer(delay) // delay before registering. + log.G(ctx).Infof("waiting %v before registering session", delay) defer timer.Stop() select { case <-timer.C: @@ -166,15 +171,23 @@ func (s *session) heartbeat(ctx context.Context) error { heartbeat := time.NewTimer(1) // send out a heartbeat right away defer heartbeat.Stop() + fields := logrus.Fields{ + "sessionID": s.sessionID, + "method": "(*session).heartbeat", + } + for { select { case <-heartbeat.C: heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout) + // TODO(anshul) log manager info in all logs in this function. + log.G(ctx).WithFields(fields).Debugf("sending heartbeat to manager %v with timeout %v", s.conn.Peer(), dispatcherRPCTimeout) resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{ SessionID: s.sessionID, }) cancel() if err != nil { + log.G(ctx).WithFields(fields).WithError(err).Errorf("heartbeat to manager %v failed", s.conn.Peer()) if grpc.Code(err) == codes.NotFound { err = errNodeNotRegistered } @@ -182,6 +195,8 @@ func (s *session) heartbeat(ctx context.Context) error { return err } + log.G(ctx).WithFields(fields).Debugf("heartbeat successful to manager %v, next heartbeat period: %v", s.conn.Peer(), resp.Period) + heartbeat.Reset(resp.Period) case <-s.closed: return errSessionClosed @@ -408,7 +423,7 @@ func (s *session) sendError(err error) { } } -// close closing session. It should be called only in <-session.errs branch +// close the given session. It should be called only in <-session.errs branch // of event loop, or when cleaning up the agent. func (s *session) close() error { s.closeOnce.Do(func() { diff --git a/components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go b/components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go index 43b384ab2a..a5510a9ff0 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go +++ b/components/engine/vendor/github.com/docker/swarmkit/connectionbroker/broker.go @@ -58,6 +58,7 @@ func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { // connection. func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { peer, err := b.remotes.Select() + if err != nil { return nil, err } @@ -98,6 +99,11 @@ type Conn struct { peer api.Peer } +// Peer returns the peer for this Conn. +func (c *Conn) Peer() api.Peer { + return c.peer +} + // Close closes the client connection if it is a remote connection. It also // records a positive experience with the remote peer if success is true, // otherwise it records a negative experience. If a local connection is in use, diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 13d68293ae..12c2a81e34 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -195,6 +195,9 @@ func getWeightedPeers(cluster Cluster) []*api.WeightedPeer { // Run runs dispatcher tasks which should be run on leader dispatcher. // Dispatcher can be stopped with cancelling ctx or calling Stop(). func (d *Dispatcher) Run(ctx context.Context) error { + ctx = log.WithModule(ctx, "dispatcher") + log.G(ctx).Info("dispatcher starting") + d.taskUpdatesLock.Lock() d.taskUpdates = make(map[string]*api.TaskStatus) d.taskUpdatesLock.Unlock() @@ -208,7 +211,6 @@ func (d *Dispatcher) Run(ctx context.Context) error { d.mu.Unlock() return errors.New("dispatcher is already running") } - ctx = log.WithModule(ctx, "dispatcher") if err := d.markNodesUnknown(ctx); err != nil { log.G(ctx).Errorf(`failed to move all nodes to "unknown" state: %v`, err) } @@ -310,8 +312,12 @@ func (d *Dispatcher) Stop() error { d.mu.Unlock() return errors.New("dispatcher is already stopped") } + + log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop") + log.Info("dispatcher stopping") d.cancel() d.mu.Unlock() + d.nodes.Clean() d.processUpdatesLock.Lock() @@ -361,13 +367,15 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { if node.Status.State == api.NodeStatus_DOWN { nodeCopy := node expireFunc := func() { + log.Infof("moving tasks to orphaned state for node: %s", nodeCopy.ID) if err := d.moveTasksToOrphaned(nodeCopy.ID); err != nil { - log.WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) + log.WithError(err).Errorf(`failed to move all tasks for node %s to "ORPHANED" state`, node.ID) } d.downNodes.Delete(nodeCopy.ID) } + log.Infof(`node %s was found to be down when marking unknown on dispatcher start`, node.ID) d.downNodes.Add(nodeCopy, expireFunc) return nil } @@ -379,16 +387,16 @@ func (d *Dispatcher) markNodesUnknown(ctx context.Context) error { expireFunc := func() { log := log.WithField("node", nodeID) - log.Debug("heartbeat expiration for unknown node") + log.Info(`heartbeat expiration for node %s in state "unknown"`, nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, `heartbeat failure for node in "unknown" state`); err != nil { log.WithError(err).Error(`failed deregistering node after heartbeat expiration for node in "unknown" state`) } } if err := d.nodes.AddUnknown(node, expireFunc); err != nil { - return errors.Wrap(err, `adding node in "unknown" state to node store failed`) + return errors.Wrapf(err, `adding node %s in "unknown" state to node store failed`, nodeID) } if err := store.UpdateNode(tx, node); err != nil { - return errors.Wrap(err, "update failed") + return errors.Wrapf(err, "update for node %s failed", nodeID) } return nil }) @@ -470,6 +478,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) { // register is used for registration of node with particular dispatcher. func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) { + logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register") // prevent register until we're ready to accept it dctx, err := d.isRunningLocked() if err != nil { @@ -491,7 +500,7 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a addr, err := nodeIPFromContext(ctx) if err != nil { - log.G(ctx).WithError(err).Debug("failed to get remote node IP") + logLocal.WithError(err).Debug("failed to get remote node IP") } if err := d.markNodeReady(dctx, nodeID, description, addr); err != nil { @@ -499,13 +508,14 @@ func (d *Dispatcher) register(ctx context.Context, nodeID string, description *a } expireFunc := func() { - log.G(ctx).Debug("heartbeat expiration") + log.G(ctx).Debug("heartbeat expiration for worker %s, setting worker status to NodeStatus_DOWN ", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DOWN, "heartbeat failure"); err != nil { log.G(ctx).WithError(err).Errorf("failed deregistering node after heartbeat expiration") } } rn := d.nodes.Add(node, expireFunc) + logLocal.Infof("worker %s was successfully registered", nodeID) // NOTE(stevvooe): We need be a little careful with re-registration. The // current implementation just matches the node id and then gives away the @@ -1029,6 +1039,8 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error { // markNodeNotReady sets the node state to some state other than READY func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error { + logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady") + dctx, err := d.isRunningLocked() if err != nil { return err @@ -1048,6 +1060,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes } expireFunc := func() { + log.G(dctx).Debugf(`worker timed-out %s in "down" state, moving all tasks to "ORPHANED" state`, id) if err := d.moveTasksToOrphaned(id); err != nil { log.G(dctx).WithError(err).Error(`failed to move all tasks to "ORPHANED" state`) } @@ -1056,6 +1069,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes } d.downNodes.Add(node, expireFunc) + logLocal.Debugf("added node %s to down nodes list", node.ID) status := &api.NodeStatus{ State: state, @@ -1080,6 +1094,7 @@ func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, mes if rn := d.nodes.Delete(id); rn == nil { return errors.Errorf("node %s is not found in local storage", id) } + logLocal.Debugf("deleted node %s from node store", node.ID) return nil } @@ -1094,6 +1109,8 @@ func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*a } period, err := d.nodes.Heartbeat(nodeInfo.NodeID, r.SessionID) + + log.G(ctx).WithField("method", "(*Dispatcher).Heartbeat").Debugf("received heartbeat from worker %v, expect next heartbeat in %v", nodeInfo, period) return &api.HeartbeatResponse{Period: period}, err } @@ -1206,6 +1223,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio } } + log.Infof("dispatcher session dropped, marking node %s down", nodeID) if err := d.markNodeNotReady(nodeID, api.NodeStatus_DISCONNECTED, "node is currently trying to find new manager"); err != nil { log.WithError(err).Error("failed to remove node") } diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index de2b93633f..dfc898e070 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -33,7 +33,7 @@ type logMessage struct { // LogBroker coordinates log subscriptions to services and tasks. Clients can // publish and subscribe to logs channels. // -// Log subscriptions are pushed to the work nodes by creating log subscsription +// Log subscriptions are pushed to the work nodes by creating log subscription // tasks. As such, the LogBroker also acts as an orchestrator of these tasks. type LogBroker struct { mu sync.RWMutex diff --git a/components/engine/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go b/components/engine/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go index eb849c0803..bdd3ec0293 100644 --- a/components/engine/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go +++ b/components/engine/vendor/github.com/docker/swarmkit/manager/state/raft/transport/peer.go @@ -239,6 +239,7 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error { // Try doing a regular rpc if the receiver doesn't support streaming. if grpc.Code(err) == codes.Unimplemented { + log.G(ctx).Info("sending message to raft peer using ProcessRaftMessage()") _, err = api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) }