Re-vendors swarmkit to include the following fixes:

- https://github.com/docker/swarmkit/pull/2288 (Allow updates of failed services with restart policy "none")
- https://github.com/docker/swarmkit/pull/2304 (Reset restart history when task spec changes)
- https://github.com/docker/swarmkit/pull/2309 (updating the service spec version when rolling back)
- https://github.com/docker/swarmkit/pull/2310 (fix for slow swarm shutdown)

Signed-off-by: Ying <ying.li@docker.com>
This commit is contained in:
Ying
2017-07-24 18:01:58 -07:00
parent bd4b12ce5d
commit 1dd11b3bb6
8 changed files with 120 additions and 82 deletions

View File

@ -106,7 +106,7 @@ github.com/stevvooe/continuity cd7a8e21e2b6f84799f5dd4b65faf49c8d3ee02d
github.com/tonistiigi/fsutil 0ac4c11b053b9c5c7c47558f81f96c7100ce50fb
# cluster
github.com/docker/swarmkit 3e2dd3c0a76149b1620b42d28dd6ff48270404e5
github.com/docker/swarmkit 069d13ff72a214cdd7a06821299987b28ee2a627
github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e
@ -143,4 +143,4 @@ github.com/opencontainers/selinux v1.0.0-rc1
# git --git-dir ./go/.git --work-tree ./go checkout revert-prefix-ignore
# cp -a go/src/archive/tar ./vendor/archive/tar
# rm -rf ./go
# vndr
# vndr

View File

@ -57,12 +57,12 @@ func New(store *store.MemoryStore) *LogBroker {
}
}
// Run the log broker
func (lb *LogBroker) Run(ctx context.Context) error {
// Start starts the log broker
func (lb *LogBroker) Start(ctx context.Context) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cancelAll != nil {
lb.mu.Unlock()
return errAlreadyRunning
}
@ -71,12 +71,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*subscription)
lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
lb.mu.Unlock()
select {
case <-lb.pctx.Done():
return lb.pctx.Err()
}
return nil
}
// Stop stops the log broker
@ -234,8 +229,15 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
subscription := lb.newSubscription(request.Selector, request.Options)
subscription.Run(lb.pctx)
subscription.Run(pctx)
defer subscription.Stop()
log := log.G(ctx).WithFields(
@ -257,8 +259,8 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
select {
case <-ctx.Done():
return ctx.Err()
case <-lb.pctx.Done():
return lb.pctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-publishCh:
publish := event.(*logMessage)
if publish.completed {
@ -308,6 +310,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
lb.nodeConnected(remote.NodeID)
defer lb.nodeDisconnected(remote.NodeID)
@ -329,7 +338,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
default:
}
@ -362,7 +371,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
}
}

View File

@ -130,6 +130,7 @@ type Manager struct {
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
watchServer *watchapi.Server
replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator
taskReaper *taskreaper.TaskReaper
@ -221,6 +222,7 @@ func New(config *Config) (*Manager, error) {
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig(), drivers.New(config.PluginGetter)),
logbroker: logbroker.New(raftNode.MemoryStore()),
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
@ -398,13 +400,12 @@ func (m *Manager) Run(parent context.Context) error {
}
baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter)
baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore())
baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
healthServer := health.NewHealthServer()
localHealthServer := health.NewHealthServer()
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
@ -477,7 +478,7 @@ func (m *Manager) Run(parent context.Context) error {
grpc_prometheus.Register(m.server)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterWatchServer(m.localserver, baseWatchAPI)
api.RegisterWatchServer(m.localserver, m.watchServer)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)
api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
@ -1001,11 +1002,13 @@ func (m *Manager) becomeLeader(ctx context.Context) {
}
}(m.dispatcher)
go func(lb *logbroker.LogBroker) {
if err := lb.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker exited with an error")
}
}(m.logbroker)
if err := m.logbroker.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker failed to start")
}
if err := m.watchServer.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("watch server failed to start")
}
go func(server *ca.Server) {
if err := server.Run(ctx); err != nil {
@ -1059,6 +1062,7 @@ func (m *Manager) becomeLeader(ctx context.Context) {
func (m *Manager) becomeFollower() {
m.dispatcher.Stop()
m.logbroker.Stop()
m.watchServer.Stop()
m.caserver.Stop()
if m.allocator != nil {

View File

@ -169,12 +169,6 @@ func (g *Orchestrator) Run(ctx context.Context) error {
delete(g.nodes, v.Node.ID)
case api.EventUpdateTask:
g.handleTaskChange(ctx, v.Task)
case api.EventDeleteTask:
// CLI allows deleting task
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
continue
}
g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
}
case <-g.stopChan:
return nil
@ -216,7 +210,7 @@ func (g *Orchestrator) handleTaskChange(ctx context.Context, t *api.Task) {
if _, exists := g.globalServices[t.ServiceID]; !exists {
return
}
// if a task's DesiredState has past running, which
// if a task's DesiredState has passed running, it
// means the task has been processed
if t.DesiredState > api.TaskStateRunning {
return
@ -264,7 +258,6 @@ func (g *Orchestrator) foreachTaskFromNode(ctx context.Context, node *api.Node,
}
func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeCompleted := make(map[string]map[string]struct{})
nodeTasks := make(map[string]map[string][]*api.Task)
g.store.View(func(tx store.ReadTx) {
@ -275,8 +268,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
continue
}
// a node may have completed this service
nodeCompleted[serviceID] = make(map[string]struct{})
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)
@ -284,11 +275,6 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
if t.DesiredState <= api.TaskStateRunning {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
nodeCompleted[serviceID][t.NodeID] = struct{}{}
}
}
}
}
@ -311,9 +297,7 @@ func (g *Orchestrator) reconcileServices(ctx context.Context, serviceIDs []strin
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
if !meetsConstraints {
g.shutdownTasks(ctx, batch, ntasks)
continue
}
@ -400,8 +384,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
return
}
// whether each service has completed on the node
completed := make(map[string]bool)
// tasks by service
tasks := make(map[string][]*api.Task)
@ -425,10 +407,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
}
if t.DesiredState <= api.TaskStateRunning {
tasks[serviceID] = append(tasks[serviceID], t)
} else {
if isTaskCompleted(t, orchestrator.RestartCondition(t)) {
completed[serviceID] = true
}
}
}
}
@ -444,13 +422,6 @@ func (g *Orchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs
continue
}
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] {
g.shutdownTasks(ctx, batch, tasks[serviceID])
continue
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
continue

View File

@ -30,6 +30,13 @@ type instanceRestartInfo struct {
// Restart.MaxAttempts and Restart.Window are both
// nonzero.
restartedInstances *list.List
// Why is specVersion in this structure and not in the map key? While
// putting it in the key would be a very simple solution, it wouldn't
// be easy to clean up map entries corresponding to old specVersions.
// Making the key version-agnostic and clearing the value whenever the
// version changes avoids the issue of stale map entries for old
// versions.
specVersion api.Version
}
type delayedStart struct {
@ -54,8 +61,7 @@ type Supervisor struct {
mu sync.Mutex
store *store.MemoryStore
delays map[string]*delayedStart
history map[instanceTuple]*instanceRestartInfo
historyByService map[string]map[instanceTuple]struct{}
historyByService map[string]map[instanceTuple]*instanceRestartInfo
TaskTimeout time.Duration
}
@ -64,8 +70,7 @@ func NewSupervisor(store *store.MemoryStore) *Supervisor {
return &Supervisor{
store: store,
delays: make(map[string]*delayedStart),
history: make(map[instanceTuple]*instanceRestartInfo),
historyByService: make(map[string]map[instanceTuple]struct{}),
historyByService: make(map[string]map[instanceTuple]*instanceRestartInfo),
TaskTimeout: defaultOldTaskTimeout,
}
}
@ -214,8 +219,8 @@ func (r *Supervisor) shouldRestart(ctx context.Context, t *api.Task, service *ap
r.mu.Lock()
defer r.mu.Unlock()
restartInfo := r.history[instanceTuple]
if restartInfo == nil {
restartInfo := r.historyByService[t.ServiceID][instanceTuple]
if restartInfo == nil || (t.SpecVersion != nil && *t.SpecVersion != restartInfo.specVersion) {
return true
}
@ -268,17 +273,26 @@ func (r *Supervisor) recordRestartHistory(restartTask *api.Task) {
r.mu.Lock()
defer r.mu.Unlock()
if r.history[tuple] == nil {
r.history[tuple] = &instanceRestartInfo{}
}
restartInfo := r.history[tuple]
restartInfo.totalRestarts++
if r.historyByService[restartTask.ServiceID] == nil {
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]struct{})
r.historyByService[restartTask.ServiceID] = make(map[instanceTuple]*instanceRestartInfo)
}
r.historyByService[restartTask.ServiceID][tuple] = struct{}{}
if r.historyByService[restartTask.ServiceID][tuple] == nil {
r.historyByService[restartTask.ServiceID][tuple] = &instanceRestartInfo{}
}
restartInfo := r.historyByService[restartTask.ServiceID][tuple]
if restartTask.SpecVersion != nil && *restartTask.SpecVersion != restartInfo.specVersion {
// This task has a different SpecVersion from the one we're
// tracking. Most likely, the service was updated. Past failures
// shouldn't count against the new service definition, so clear
// the history for this instance.
*restartInfo = instanceRestartInfo{
specVersion: *restartTask.SpecVersion,
}
}
restartInfo.totalRestarts++
if restartTask.Spec.Restart.Window != nil && (restartTask.Spec.Restart.Window.Seconds != 0 || restartTask.Spec.Restart.Window.Nanos != 0) {
if restartInfo.restartedInstances == nil {
@ -432,16 +446,6 @@ func (r *Supervisor) CancelAll() {
// ClearServiceHistory forgets restart history related to a given service ID.
func (r *Supervisor) ClearServiceHistory(serviceID string) {
r.mu.Lock()
defer r.mu.Unlock()
tuples := r.historyByService[serviceID]
if tuples == nil {
return
}
delete(r.historyByService, serviceID)
for t := range tuples {
delete(r.history, t)
}
r.mu.Unlock()
}

View File

@ -601,7 +601,9 @@ func (u *Updater) rollbackUpdate(ctx context.Context, serviceID, message string)
return errors.New("cannot roll back service because no previous spec is available")
}
service.Spec = *service.PreviousSpec
service.SpecVersion = service.PreviousSpecVersion.Copy()
service.PreviousSpec = nil
service.PreviousSpecVersion = nil
return store.UpdateService(tx, service)
})

View File

@ -1,12 +1,24 @@
package watchapi
import (
"errors"
"sync"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
var (
errAlreadyRunning = errors.New("broker is already running")
errNotRunning = errors.New("broker is not running")
)
// Server is the store API gRPC server.
type Server struct {
store *store.MemoryStore
store *store.MemoryStore
mu sync.Mutex
pctx context.Context
cancelAll func()
}
// NewServer creates a store API server.
@ -15,3 +27,30 @@ func NewServer(store *store.MemoryStore) *Server {
store: store,
}
}
// Start starts the watch server.
func (s *Server) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll != nil {
return errAlreadyRunning
}
s.pctx, s.cancelAll = context.WithCancel(ctx)
return nil
}
// Stop stops the watch server.
func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll == nil {
return errNotRunning
}
s.cancelAll()
s.cancelAll = nil
return nil
}

View File

@ -17,6 +17,13 @@ import (
func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) error {
ctx := stream.Context()
s.mu.Lock()
pctx := s.pctx
s.mu.Unlock()
if pctx == nil {
return errNotRunning
}
watchArgs, err := api.ConvertWatchArgs(request.Entries)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "%s", err.Error())
@ -39,6 +46,8 @@ func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer)
select {
case <-ctx.Done():
return ctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-watch:
if commitEvent, ok := event.(state.EventCommit); ok && len(events) > 0 {
if err := stream.Send(&api.WatchMessage{Events: events, Version: commitEvent.Version}); err != nil {