Merge pull request #134 from cyli/re-vendor-swarmkit

[17.06] Re-vendor swarmkit for various fixes
This commit is contained in:
Andrew Hsu
2017-07-26 16:11:21 -07:00
committed by GitHub
6 changed files with 89 additions and 26 deletions

View File

@ -108,7 +108,7 @@ github.com/containerd/containerd 6e23458c129b551d5c9871e5174f6b1b7f6d1170 https:
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit fb828cea0ec518dadea0f04900e0057e38194562
github.com/docker/swarmkit a0a7f6f663c35c92ddcd73e2c1b97b0f4ed8caf3
github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

View File

@ -57,12 +57,12 @@ func New(store *store.MemoryStore) *LogBroker {
}
}
// Run the log broker
func (lb *LogBroker) Run(ctx context.Context) error {
// Start starts the log broker
func (lb *LogBroker) Start(ctx context.Context) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.cancelAll != nil {
lb.mu.Unlock()
return errAlreadyRunning
}
@ -71,12 +71,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*subscription)
lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
lb.mu.Unlock()
select {
case <-lb.pctx.Done():
return lb.pctx.Err()
}
return nil
}
// Stop stops the log broker
@ -234,8 +229,15 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
subscription := lb.newSubscription(request.Selector, request.Options)
subscription.Run(lb.pctx)
subscription.Run(pctx)
defer subscription.Stop()
log := log.G(ctx).WithFields(
@ -257,8 +259,8 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
select {
case <-ctx.Done():
return ctx.Err()
case <-lb.pctx.Done():
return lb.pctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-publishCh:
publish := event.(*logMessage)
if publish.completed {
@ -308,6 +310,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
return err
}
lb.mu.Lock()
pctx := lb.pctx
lb.mu.Unlock()
if pctx == nil {
return errNotRunning
}
lb.nodeConnected(remote.NodeID)
defer lb.nodeDisconnected(remote.NodeID)
@ -329,7 +338,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
default:
}
@ -362,7 +371,7 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-lb.pctx.Done():
case <-pctx.Done():
return nil
}
}

View File

@ -129,6 +129,7 @@ type Manager struct {
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
watchServer *watchapi.Server
replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator
taskReaper *taskreaper.TaskReaper
@ -220,6 +221,7 @@ func New(config *Config) (*Manager, error) {
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
logbroker: logbroker.New(raftNode.MemoryStore()),
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
@ -397,13 +399,12 @@ func (m *Manager) Run(parent context.Context) error {
}
baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.caserver, m.config.PluginGetter)
baseWatchAPI := watchapi.NewServer(m.raftNode.MemoryStore())
baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
healthServer := health.NewHealthServer()
localHealthServer := health.NewHealthServer()
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(baseWatchAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
@ -476,7 +477,7 @@ func (m *Manager) Run(parent context.Context) error {
grpc_prometheus.Register(m.server)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterWatchServer(m.localserver, baseWatchAPI)
api.RegisterWatchServer(m.localserver, m.watchServer)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)
api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
@ -489,6 +490,10 @@ func (m *Manager) Run(parent context.Context) error {
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
if err := m.watchServer.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("watch server failed to start")
}
go m.serveListener(ctx, m.remoteListener)
go m.serveListener(ctx, m.controlListener)
@ -564,8 +569,8 @@ func (m *Manager) Run(parent context.Context) error {
const stopTimeout = 8 * time.Second
// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the scheduler. If clearData is set, the
// raft logs, snapshots, and keys will be erased.
// active RPCs as well as stopping the manager's subsystems. If clearData is
// set, the raft logs, snapshots, and keys will be erased.
func (m *Manager) Stop(ctx context.Context, clearData bool) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
@ -599,6 +604,7 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {
m.dispatcher.Stop()
m.logbroker.Stop()
m.watchServer.Stop()
m.caserver.Stop()
if m.allocator != nil {
@ -1000,11 +1006,9 @@ func (m *Manager) becomeLeader(ctx context.Context) {
}
}(m.dispatcher)
go func(lb *logbroker.LogBroker) {
if err := lb.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker exited with an error")
}
}(m.logbroker)
if err := m.logbroker.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("LogBroker failed to start")
}
go func(server *ca.Server) {
if err := server.Run(ctx); err != nil {

View File

@ -601,7 +601,9 @@ func (u *Updater) rollbackUpdate(ctx context.Context, serviceID, message string)
return errors.New("cannot roll back service because no previous spec is available")
}
service.Spec = *service.PreviousSpec
service.SpecVersion = service.PreviousSpecVersion.Copy()
service.PreviousSpec = nil
service.PreviousSpecVersion = nil
return store.UpdateService(tx, service)
})

View File

@ -1,12 +1,24 @@
package watchapi
import (
"errors"
"sync"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
var (
errAlreadyRunning = errors.New("broker is already running")
errNotRunning = errors.New("broker is not running")
)
// Server is the store API gRPC server.
type Server struct {
store *store.MemoryStore
store *store.MemoryStore
mu sync.Mutex
pctx context.Context
cancelAll func()
}
// NewServer creates a store API server.
@ -15,3 +27,30 @@ func NewServer(store *store.MemoryStore) *Server {
store: store,
}
}
// Start starts the watch server.
func (s *Server) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll != nil {
return errAlreadyRunning
}
s.pctx, s.cancelAll = context.WithCancel(ctx)
return nil
}
// Stop stops the watch server.
func (s *Server) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cancelAll == nil {
return errNotRunning
}
s.cancelAll()
s.cancelAll = nil
return nil
}

View File

@ -17,6 +17,13 @@ import (
func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer) error {
ctx := stream.Context()
s.mu.Lock()
pctx := s.pctx
s.mu.Unlock()
if pctx == nil {
return errNotRunning
}
watchArgs, err := api.ConvertWatchArgs(request.Entries)
if err != nil {
return grpc.Errorf(codes.InvalidArgument, "%s", err.Error())
@ -39,6 +46,8 @@ func (s *Server) Watch(request *api.WatchRequest, stream api.Watch_WatchServer)
select {
case <-ctx.Done():
return ctx.Err()
case <-pctx.Done():
return pctx.Err()
case event := <-watch:
if commitEvent, ok := event.(state.EventCommit); ok && len(events) > 0 {
if err := stream.Send(&api.WatchMessage{Events: events, Version: commitEvent.Version}); err != nil {