Merge pull request #43 from cyli/re-vendor-swarmkit

[17.06] Re-vendor swarmkit
This commit is contained in:
Andrew Hsu
2017-06-09 17:32:55 -07:00
committed by GitHub
4 changed files with 46 additions and 44 deletions

View File

@ -107,7 +107,7 @@ github.com/containerd/containerd cfb82a876ecc11b5ca0977d1733adbe58599088a
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit 4b872cfac8ffc0cc7fff434902cc05dbc7612da5
github.com/docker/swarmkit 758b59114f7eef55faca7007af773d4097540ed2
github.com/gogo/protobuf v0.4
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a
github.com/google/certificate-transparency d90e65c3a07988180c5b1ece71791c0b6506826e

View File

@ -99,7 +99,7 @@ func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
if newRootCA.RootRotation != nil {
var nodes []*api.Node
r.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted))
nodes, err = store.FindNodes(tx, store.All)
})
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
@ -132,8 +132,8 @@ func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
r.mu.Lock()
defer r.mu.Unlock()
// if we're not in the middle of a root rotation, or if this node does not have membership, ignore it
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted {
// if we're not in the middle of a root rotation ignore the update
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil {
return
}
if hasIssuer(node, &r.currentIssuer) {

View File

@ -51,13 +51,6 @@ type taskBallot struct {
// allocActor controls the various phases in the lifecycle of one kind of allocator.
type allocActor struct {
// Channel through which the allocator gets all the events
// that it is interested in.
ch chan events.Event
// cancel unregisters the watcher.
cancel func()
// Task voter identity of the allocator.
taskVoter string
@ -90,7 +83,10 @@ func New(store *store.MemoryStore, pg plugingetter.PluginGetter) (*Allocator, er
func (a *Allocator) Run(ctx context.Context) error {
// Setup cancel context for all goroutines to use.
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
var (
wg sync.WaitGroup
actors []func() error
)
defer func() {
cancel()
@ -98,26 +94,8 @@ func (a *Allocator) Run(ctx context.Context) error {
close(a.doneChan)
}()
var actors []func() error
watch, watchCancel := state.Watch(a.store.WatchQueue(),
api.EventCreateNetwork{},
api.EventDeleteNetwork{},
api.EventCreateService{},
api.EventUpdateService{},
api.EventDeleteService{},
api.EventCreateTask{},
api.EventUpdateTask{},
api.EventDeleteTask{},
api.EventCreateNode{},
api.EventUpdateNode{},
api.EventDeleteNode{},
state.EventCommit{},
)
for _, aa := range []allocActor{
{
ch: watch,
cancel: watchCancel,
taskVoter: networkVoter,
init: a.doNetworkInit,
action: a.doNetworkAlloc,
@ -127,8 +105,8 @@ func (a *Allocator) Run(ctx context.Context) error {
a.registerToVote(aa.taskVoter)
}
// Copy the iterated value for variable capture.
aaCopy := aa
// Assign a pointer for variable capture
aaPtr := &aa
actor := func() error {
wg.Add(1)
defer wg.Done()
@ -136,19 +114,19 @@ func (a *Allocator) Run(ctx context.Context) error {
// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
if err := aaCopy.init(ctx); err != nil {
// Stop the watches for this allocator
// if we are failing in the init of
// this allocator.
aa.cancel()
watch, watchCancel, err := a.init(ctx, aaPtr)
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
a.run(ctx, aaCopy)
}()
go func(watch <-chan events.Event, watchCancel func()) {
defer func() {
wg.Done()
watchCancel()
}()
a.run(ctx, *aaPtr, watch)
}(watch, watchCancel)
return nil
}
@ -172,10 +150,34 @@ func (a *Allocator) Stop() {
<-a.doneChan
}
func (a *Allocator) run(ctx context.Context, aa allocActor) {
func (a *Allocator) init(ctx context.Context, aa *allocActor) (<-chan events.Event, func(), error) {
watch, watchCancel := state.Watch(a.store.WatchQueue(),
api.EventCreateNetwork{},
api.EventDeleteNetwork{},
api.EventCreateService{},
api.EventUpdateService{},
api.EventDeleteService{},
api.EventCreateTask{},
api.EventUpdateTask{},
api.EventDeleteTask{},
api.EventCreateNode{},
api.EventUpdateNode{},
api.EventDeleteNode{},
state.EventCommit{},
)
if err := aa.init(ctx); err != nil {
watchCancel()
return nil, nil, err
}
return watch, watchCancel, nil
}
func (a *Allocator) run(ctx context.Context, aa allocActor, watch <-chan events.Event) {
for {
select {
case ev, ok := <-aa.ch:
case ev, ok := <-watch:
if !ok {
return
}

View File

@ -63,7 +63,7 @@ func IsTaskDirty(s *api.Service, t *api.Task) bool {
// If the spec version matches, we know the task is not dirty. However,
// if it does not match, that doesn't mean the task is dirty, since
// only a portion of the spec is included in the comparison.
if t.SpecVersion != nil && *s.SpecVersion == *t.SpecVersion {
if t.SpecVersion != nil && s.SpecVersion != nil && *s.SpecVersion == *t.SpecVersion {
return false
}