From 71018bb017d17019f6c219146d6d4a8771506f0e Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Mon, 13 Jul 2015 08:55:28 -0700 Subject: [PATCH 1/2] Benchmark for pkg/pubsub package Signed-off-by: Alexander Morozov Upstream-commit: e5da4d62efe17c03ccca3c2707eb19b76afdc150 Component: engine --- .../engine/pkg/pubsub/publisher_test.go | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/components/engine/pkg/pubsub/publisher_test.go b/components/engine/pkg/pubsub/publisher_test.go index c19059a8a4..5e99e46c3d 100644 --- a/components/engine/pkg/pubsub/publisher_test.go +++ b/components/engine/pkg/pubsub/publisher_test.go @@ -1,6 +1,7 @@ package pubsub import ( + "fmt" "testing" "time" ) @@ -61,3 +62,61 @@ func TestClosePublisher(t *testing.T) { } } } + +const sampleText = "test" + +type testSubscriber struct { + dataCh chan interface{} + ch chan error +} + +func (s *testSubscriber) Wait() error { + return <-s.ch +} + +func newTestSubscriber(p *Publisher) *testSubscriber { + ts := &testSubscriber{ + dataCh: p.Subscribe(), + ch: make(chan error), + } + go func() { + for data := range ts.dataCh { + s, ok := data.(string) + if !ok { + ts.ch <- fmt.Errorf("Unexpected type %T", data) + break + } + if s != sampleText { + ts.ch <- fmt.Errorf("Unexpected text %s", s) + break + } + } + close(ts.ch) + }() + return ts +} + +func BenchmarkPubSub(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + p := NewPublisher(0, 1024) + var subs [](*testSubscriber) + for j := 0; j < 50; j++ { + subs = append(subs, newTestSubscriber(p)) + } + b.StartTimer() + for j := 0; j < 1000; j++ { + p.Publish(sampleText) + } + time.AfterFunc(1*time.Second, func() { + for _, s := range subs { + p.Evict(s.dataCh) + } + }) + for _, s := range subs { + if err := s.Wait(); err != nil { + b.Fatal(err) + } + } + } +} From 9a60e47a81e6b5f34988ed7529cf96f5fa7a6f8b Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Mon, 13 Jul 2015 09:05:47 -0700 Subject: [PATCH 2/2] Race test for pkg/pubsub package Signed-off-by: Alexander Morozov Upstream-commit: 8aa7ba731a954ffb626c2a07f524c8116ab3f212 Component: engine --- .../engine/pkg/pubsub/publisher_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/components/engine/pkg/pubsub/publisher_test.go b/components/engine/pkg/pubsub/publisher_test.go index 5e99e46c3d..d6b0a1d59a 100644 --- a/components/engine/pkg/pubsub/publisher_test.go +++ b/components/engine/pkg/pubsub/publisher_test.go @@ -96,6 +96,26 @@ func newTestSubscriber(p *Publisher) *testSubscriber { return ts } +// for testing with -race +func TestPubSubRace(t *testing.T) { + p := NewPublisher(0, 1024) + var subs [](*testSubscriber) + for j := 0; j < 50; j++ { + subs = append(subs, newTestSubscriber(p)) + } + for j := 0; j < 1000; j++ { + p.Publish(sampleText) + } + time.AfterFunc(1*time.Second, func() { + for _, s := range subs { + p.Evict(s.dataCh) + } + }) + for _, s := range subs { + s.Wait() + } +} + func BenchmarkPubSub(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer()