From 6bc65656e83f95f92c37447c3e603abc187a9d10 Mon Sep 17 00:00:00 2001 From: Vincent Batts Date: Tue, 15 Sep 2015 14:28:22 -0400 Subject: [PATCH 1/2] daemon/events: let Log be [slightly] blocking With go1.5's concurrency, the use of a goroutine in Log'ing events was causing the resulting events to not be in order. Signed-off-by: Vincent Batts Upstream-commit: 09e7dd03f8740d7865ea91c56a231ce9ae9bec82 Component: engine --- components/engine/daemon/events/events.go | 24 +++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/components/engine/daemon/events/events.go b/components/engine/daemon/events/events.go index 07ee29a346..6687e2f155 100644 --- a/components/engine/daemon/events/events.go +++ b/components/engine/daemon/events/events.go @@ -45,19 +45,17 @@ func (e *Events) Evict(l chan interface{}) { // Log broadcasts event to listeners. Each listener has 100 millisecond for // receiving event or it will be skipped. func (e *Events) Log(action, id, from string) { - go func() { - e.mu.Lock() - jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()} - if len(e.events) == cap(e.events) { - // discard oldest event - copy(e.events, e.events[1:]) - e.events[len(e.events)-1] = jm - } else { - e.events = append(e.events, jm) - } - e.mu.Unlock() - e.pub.Publish(jm) - }() + jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()} + e.mu.Lock() + if len(e.events) == cap(e.events) { + // discard oldest event + copy(e.events, e.events[1:]) + e.events[len(e.events)-1] = jm + } else { + e.events = append(e.events, jm) + } + e.mu.Unlock() + e.pub.Publish(jm) } // SubscribersCount returns number of event listeners From 5aa203fb958304954096c044ba0facdf2038d320 Mon Sep 17 00:00:00 2001 From: Vincent Batts Date: Tue, 15 Sep 2015 15:53:13 -0400 Subject: [PATCH 2/2] Revert "Make events test more deterministic in go1.5" This reverts commit 386aefb9fcf65f1a5f68fd61e5108ea0858d2d3f. Signed-off-by: Vincent Batts Upstream-commit: fc77ea787e33864284c14ca743bf3ebc0f50c249 Component: engine --- .../engine/daemon/events/events_test.go | 63 ++++++++++++------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/components/engine/daemon/events/events_test.go b/components/engine/daemon/events/events_test.go index 73432478fc..7aa8d9facc 100644 --- a/components/engine/daemon/events/events_test.go +++ b/components/engine/daemon/events/events_test.go @@ -1,6 +1,7 @@ package events import ( + "fmt" "testing" "time" @@ -80,39 +81,55 @@ func TestEventsLogTimeout(t *testing.T) { } } -func TestEventsCap(t *testing.T) { +func TestLogEvents(t *testing.T) { e := New() - for i := 0; i < eventsLimit+1; i++ { - e.Log("action", "id", "from") - } - // let all events go through - time.Sleep(1 * time.Second) + for i := 0; i < eventsLimit+16; i++ { + action := fmt.Sprintf("action_%d", i) + id := fmt.Sprintf("cont_%d", i) + from := fmt.Sprintf("image_%d", i) + e.Log(action, id, from) + } + time.Sleep(50 * time.Millisecond) current, l := e.Subscribe() - if len(current) != eventsLimit { - t.Fatalf("Must be %d events, got %d", eventsLimit, len(current)) + for i := 0; i < 10; i++ { + num := i + eventsLimit + 16 + action := fmt.Sprintf("action_%d", num) + id := fmt.Sprintf("cont_%d", num) + from := fmt.Sprintf("image_%d", num) + e.Log(action, id, from) } if len(e.events) != eventsLimit { t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) } - for i := 0; i < 10; i++ { - e.Log("action", "id", "from") - } - // let all events go through - time.Sleep(1 * time.Second) - var msgs []*jsonmessage.JSONMessage for len(msgs) < 10 { - select { - case m := <-l: - jm, ok := (m).(*jsonmessage.JSONMessage) - if !ok { - t.Fatalf("Unexpected type %T", m) - } - msgs = append(msgs, jm) - default: - t.Fatalf("There is no enough events in channel") + m := <-l + jm, ok := (m).(*jsonmessage.JSONMessage) + if !ok { + t.Fatalf("Unexpected type %T", m) } + msgs = append(msgs, jm) + } + if len(current) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(current)) + } + first := current[0] + if first.Status != "action_16" { + t.Fatalf("First action is %s, must be action_16", first.Status) + } + last := current[len(current)-1] + if last.Status != "action_79" { + t.Fatalf("Last action is %s, must be action_79", last.Status) + } + + firstC := msgs[0] + if firstC.Status != "action_80" { + t.Fatalf("First action is %s, must be action_80", firstC.Status) + } + lastC := msgs[len(msgs)-1] + if lastC.Status != "action_89" { + t.Fatalf("Last action is %s, must be action_89", lastC.Status) } }