From 1f889ca6411aacbc27e3fe46503f1bda66cae562 Mon Sep 17 00:00:00 2001 From: Tibor Vass Date: Tue, 6 Oct 2015 13:24:49 -0400 Subject: [PATCH] Move types from progressreader and broadcastwriter to broadcaster progressreader.Broadcaster becomes broadcaster.Buffered and broadcastwriter.Writer becomes broadcaster.Unbuffered. The package broadcastwriter is thus renamed to broadcaster. Signed-off-by: Tibor Vass Upstream-commit: 2391233404e2e6892c79a24f31cc99715c086b21 Component: engine --- components/engine/daemon/container.go | 10 +++--- components/engine/daemon/daemon.go | 6 ++-- components/engine/daemon/exec.go | 6 ++-- components/engine/graph/pools_test.go | 6 ++-- components/engine/graph/pull_v2.go | 3 +- components/engine/graph/tags.go | 14 ++++---- .../buffered.go} | 34 +++++++++---------- .../unbuffered.go} | 19 ++++------- .../unbuffered_test.go} | 30 ++++++++-------- 9 files changed, 62 insertions(+), 66 deletions(-) rename components/engine/pkg/{progressreader/broadcaster.go => broadcaster/buffered.go} (80%) rename components/engine/pkg/{broadcastwriter/broadcastwriter.go => broadcaster/unbuffered.go} (64%) rename components/engine/pkg/{broadcastwriter/broadcastwriter_test.go => broadcaster/unbuffered_test.go} (87%) diff --git a/components/engine/daemon/container.go b/components/engine/daemon/container.go index d3e0763c68..adff7932f0 100644 --- a/components/engine/daemon/container.go +++ b/components/engine/daemon/container.go @@ -22,7 +22,7 @@ import ( derr "github.com/docker/docker/errors" "github.com/docker/docker/image" "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/broadcastwriter" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/mount" @@ -41,8 +41,8 @@ var ( ) type streamConfig struct { - stdout *broadcastwriter.BroadcastWriter - stderr *broadcastwriter.BroadcastWriter + stdout *broadcaster.Unbuffered + stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser } @@ -318,13 +318,13 @@ func (streamConfig *streamConfig) StdinPipe() io.WriteCloser { func (streamConfig *streamConfig) StdoutPipe() io.ReadCloser { reader, writer := io.Pipe() - streamConfig.stdout.AddWriter(writer) + streamConfig.stdout.Add(writer) return ioutils.NewBufReader(reader) } func (streamConfig *streamConfig) StderrPipe() io.ReadCloser { reader, writer := io.Pipe() - streamConfig.stderr.AddWriter(writer) + streamConfig.stderr.Add(writer) return ioutils.NewBufReader(reader) } diff --git a/components/engine/daemon/daemon.go b/components/engine/daemon/daemon.go index d4eca3de41..4c0e233618 100644 --- a/components/engine/daemon/daemon.go +++ b/components/engine/daemon/daemon.go @@ -32,7 +32,7 @@ import ( "github.com/docker/docker/graph" "github.com/docker/docker/image" "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/broadcastwriter" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/discovery" "github.com/docker/docker/pkg/fileutils" "github.com/docker/docker/pkg/graphdb" @@ -194,8 +194,8 @@ func (daemon *Daemon) Register(container *Container) error { container.daemon = daemon // Attach to stdout and stderr - container.stderr = broadcastwriter.New() - container.stdout = broadcastwriter.New() + container.stderr = new(broadcaster.Unbuffered) + container.stdout = new(broadcaster.Unbuffered) // Attach to stdin if container.Config.OpenStdin { container.stdin, container.stdinPipe = io.Pipe() diff --git a/components/engine/daemon/exec.go b/components/engine/daemon/exec.go index 99331ed257..6cc274dffb 100644 --- a/components/engine/daemon/exec.go +++ b/components/engine/daemon/exec.go @@ -10,7 +10,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" derr "github.com/docker/docker/errors" - "github.com/docker/docker/pkg/broadcastwriter" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/stringid" @@ -233,8 +233,8 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io. cStderr = stderr } - ec.streamConfig.stderr = broadcastwriter.New() - ec.streamConfig.stdout = broadcastwriter.New() + ec.streamConfig.stderr = new(broadcaster.Unbuffered) + ec.streamConfig.stdout = new(broadcaster.Unbuffered) // Attach to stdin if ec.OpenStdin { ec.streamConfig.stdin, ec.streamConfig.stdinPipe = io.Pipe() diff --git a/components/engine/graph/pools_test.go b/components/engine/graph/pools_test.go index a7b27271b7..6382c15596 100644 --- a/components/engine/graph/pools_test.go +++ b/components/engine/graph/pools_test.go @@ -3,7 +3,7 @@ package graph import ( "testing" - "github.com/docker/docker/pkg/progressreader" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/reexec" ) @@ -13,8 +13,8 @@ func init() { func TestPools(t *testing.T) { s := &TagStore{ - pullingPool: make(map[string]*progressreader.Broadcaster), - pushingPool: make(map[string]*progressreader.Broadcaster), + pullingPool: make(map[string]*broadcaster.Buffered), + pushingPool: make(map[string]*broadcaster.Buffered), } if _, found := s.poolAdd("pull", "test1"); found { diff --git a/components/engine/graph/pull_v2.go b/components/engine/graph/pull_v2.go index 1cb9d5d91d..eceb70f152 100644 --- a/components/engine/graph/pull_v2.go +++ b/components/engine/graph/pull_v2.go @@ -11,6 +11,7 @@ import ( "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest" "github.com/docker/docker/image" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" @@ -110,7 +111,7 @@ type downloadInfo struct { size int64 err chan error poolKey string - broadcaster *progressreader.Broadcaster + broadcaster *broadcaster.Buffered } type errVerification struct{} diff --git a/components/engine/graph/tags.go b/components/engine/graph/tags.go index 97737f4b9c..87cbbc5ba2 100644 --- a/components/engine/graph/tags.go +++ b/components/engine/graph/tags.go @@ -16,8 +16,8 @@ import ( "github.com/docker/docker/daemon/events" "github.com/docker/docker/graph/tags" "github.com/docker/docker/image" + "github.com/docker/docker/pkg/broadcaster" "github.com/docker/docker/pkg/parsers" - "github.com/docker/docker/pkg/progressreader" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/registry" "github.com/docker/docker/trust" @@ -37,8 +37,8 @@ type TagStore struct { sync.Mutex // FIXME: move push/pull-related fields // to a helper type - pullingPool map[string]*progressreader.Broadcaster - pushingPool map[string]*progressreader.Broadcaster + pullingPool map[string]*broadcaster.Buffered + pushingPool map[string]*broadcaster.Buffered registryService *registry.Service eventsService *events.Events trustService *trust.Store @@ -94,8 +94,8 @@ func NewTagStore(path string, cfg *TagStoreConfig) (*TagStore, error) { graph: cfg.Graph, trustKey: cfg.Key, Repositories: make(map[string]Repository), - pullingPool: make(map[string]*progressreader.Broadcaster), - pushingPool: make(map[string]*progressreader.Broadcaster), + pullingPool: make(map[string]*broadcaster.Buffered), + pushingPool: make(map[string]*broadcaster.Buffered), registryService: cfg.Registry, eventsService: cfg.Events, trustService: cfg.Trust, @@ -437,7 +437,7 @@ func validateDigest(dgst string) error { // poolAdd checks if a push or pull is already running, and returns // (broadcaster, true) if a running operation is found. Otherwise, it creates a // new one and returns (broadcaster, false). -func (store *TagStore) poolAdd(kind, key string) (*progressreader.Broadcaster, bool) { +func (store *TagStore) poolAdd(kind, key string) (*broadcaster.Buffered, bool) { store.Lock() defer store.Unlock() @@ -448,7 +448,7 @@ func (store *TagStore) poolAdd(kind, key string) (*progressreader.Broadcaster, b return p, true } - broadcaster := progressreader.NewBroadcaster() + broadcaster := broadcaster.NewBuffered() switch kind { case "pull": diff --git a/components/engine/pkg/progressreader/broadcaster.go b/components/engine/pkg/broadcaster/buffered.go similarity index 80% rename from components/engine/pkg/progressreader/broadcaster.go rename to components/engine/pkg/broadcaster/buffered.go index a48ff226db..57f5f97862 100644 --- a/components/engine/pkg/progressreader/broadcaster.go +++ b/components/engine/pkg/broadcaster/buffered.go @@ -1,4 +1,4 @@ -package progressreader +package broadcaster import ( "errors" @@ -6,10 +6,10 @@ import ( "sync" ) -// Broadcaster keeps track of one or more observers watching the progress +// Buffered keeps track of one or more observers watching the progress // of an operation. For example, if multiple clients are trying to pull an -// image, they share a Broadcaster for the download operation. -type Broadcaster struct { +// image, they share a Buffered struct for the download operation. +type Buffered struct { sync.Mutex // c is a channel that observers block on, waiting for the operation // to finish. @@ -29,9 +29,9 @@ type Broadcaster struct { result error } -// NewBroadcaster returns a Broadcaster structure -func NewBroadcaster() *Broadcaster { - b := &Broadcaster{ +// NewBuffered returns an initialized Buffered structure. +func NewBuffered() *Buffered { + b := &Buffered{ c: make(chan struct{}), } b.cond = sync.NewCond(b) @@ -39,7 +39,7 @@ func NewBroadcaster() *Broadcaster { } // closed returns true if and only if the broadcaster has been closed -func (broadcaster *Broadcaster) closed() bool { +func (broadcaster *Buffered) closed() bool { select { case <-broadcaster.c: return true @@ -51,7 +51,7 @@ func (broadcaster *Broadcaster) closed() bool { // receiveWrites runs as a goroutine so that writes don't block the Write // function. It writes the new data in broadcaster.history each time there's // activity on the broadcaster.cond condition variable. -func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { +func (broadcaster *Buffered) receiveWrites(observer io.Writer) { n := 0 broadcaster.Lock() @@ -98,13 +98,13 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { // Write adds data to the history buffer, and also writes it to all current // observers. -func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { +func (broadcaster *Buffered) Write(p []byte) (n int, err error) { broadcaster.Lock() defer broadcaster.Unlock() // Is the broadcaster closed? If so, the write should fail. if broadcaster.closed() { - return 0, errors.New("attempted write to closed progressreader Broadcaster") + return 0, errors.New("attempted write to a closed broadcaster.Buffered") } // Add message in p to the history slice @@ -117,15 +117,15 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { return len(p), nil } -// Add adds an observer to the Broadcaster. The new observer receives the +// Add adds an observer to the broadcaster. The new observer receives the // data from the history buffer, and also all subsequent data. -func (broadcaster *Broadcaster) Add(w io.Writer) error { +func (broadcaster *Buffered) Add(w io.Writer) error { // The lock is acquired here so that Add can't race with Close broadcaster.Lock() defer broadcaster.Unlock() if broadcaster.closed() { - return errors.New("attempted to add observer to closed progressreader Broadcaster") + return errors.New("attempted to add observer to a closed broadcaster.Buffered") } broadcaster.wg.Add(1) @@ -136,7 +136,7 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error { // CloseWithError signals to all observers that the operation has finished. Its // argument is a result that should be returned to waiters blocking on Wait. -func (broadcaster *Broadcaster) CloseWithError(result error) { +func (broadcaster *Buffered) CloseWithError(result error) { broadcaster.Lock() if broadcaster.closed() { broadcaster.Unlock() @@ -153,14 +153,14 @@ func (broadcaster *Broadcaster) CloseWithError(result error) { // Close signals to all observers that the operation has finished. It causes // all calls to Wait to return nil. -func (broadcaster *Broadcaster) Close() { +func (broadcaster *Buffered) Close() { broadcaster.CloseWithError(nil) } // Wait blocks until the operation is marked as completed by the Close method, // and all writer goroutines have completed. It returns the argument that was // passed to Close. -func (broadcaster *Broadcaster) Wait() error { +func (broadcaster *Buffered) Wait() error { <-broadcaster.c broadcaster.wg.Wait() return broadcaster.result diff --git a/components/engine/pkg/broadcastwriter/broadcastwriter.go b/components/engine/pkg/broadcaster/unbuffered.go similarity index 64% rename from components/engine/pkg/broadcastwriter/broadcastwriter.go rename to components/engine/pkg/broadcaster/unbuffered.go index e49810c65b..784d65d6fe 100644 --- a/components/engine/pkg/broadcastwriter/broadcastwriter.go +++ b/components/engine/pkg/broadcaster/unbuffered.go @@ -1,18 +1,18 @@ -package broadcastwriter +package broadcaster import ( "io" "sync" ) -// BroadcastWriter accumulate multiple io.WriteCloser by stream. -type BroadcastWriter struct { +// Unbuffered accumulates multiple io.WriteCloser by stream. +type Unbuffered struct { mu sync.Mutex writers []io.WriteCloser } -// AddWriter adds new io.WriteCloser. -func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { +// Add adds new io.WriteCloser. +func (w *Unbuffered) Add(writer io.WriteCloser) { w.mu.Lock() w.writers = append(w.writers, writer) w.mu.Unlock() @@ -20,7 +20,7 @@ func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) { // Write writes bytes to all writers. Failed writers will be evicted during // this call. -func (w *BroadcastWriter) Write(p []byte) (n int, err error) { +func (w *Unbuffered) Write(p []byte) (n int, err error) { w.mu.Lock() var evict []int for i, sw := range w.writers { @@ -38,7 +38,7 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) { // Clean closes and removes all writers. Last non-eol-terminated part of data // will be saved. -func (w *BroadcastWriter) Clean() error { +func (w *Unbuffered) Clean() error { w.mu.Lock() for _, sw := range w.writers { sw.Close() @@ -47,8 +47,3 @@ func (w *BroadcastWriter) Clean() error { w.mu.Unlock() return nil } - -// New creates a new BroadcastWriter. -func New() *BroadcastWriter { - return &BroadcastWriter{} -} diff --git a/components/engine/pkg/broadcastwriter/broadcastwriter_test.go b/components/engine/pkg/broadcaster/unbuffered_test.go similarity index 87% rename from components/engine/pkg/broadcastwriter/broadcastwriter_test.go rename to components/engine/pkg/broadcaster/unbuffered_test.go index 1ff4caea05..9f8e72bc0f 100644 --- a/components/engine/pkg/broadcastwriter/broadcastwriter_test.go +++ b/components/engine/pkg/broadcaster/unbuffered_test.go @@ -1,4 +1,4 @@ -package broadcastwriter +package broadcaster import ( "bytes" @@ -28,14 +28,14 @@ func (dw *dummyWriter) Close() error { return nil } -func TestBroadcastWriter(t *testing.T) { - writer := New() +func TestUnbuffered(t *testing.T) { + writer := new(Unbuffered) // Test 1: Both bufferA and bufferB should contain "foo" bufferA := &dummyWriter{} - writer.AddWriter(bufferA) + writer.Add(bufferA) bufferB := &dummyWriter{} - writer.AddWriter(bufferB) + writer.Add(bufferB) writer.Write([]byte("foo")) if bufferA.String() != "foo" { @@ -49,7 +49,7 @@ func TestBroadcastWriter(t *testing.T) { // Test2: bufferA and bufferB should contain "foobar", // while bufferC should only contain "bar" bufferC := &dummyWriter{} - writer.AddWriter(bufferC) + writer.Add(bufferC) writer.Write([]byte("bar")) if bufferA.String() != "foobar" { @@ -87,7 +87,7 @@ func TestBroadcastWriter(t *testing.T) { bufferB.failOnWrite = true bufferC.failOnWrite = true bufferD := &dummyWriter{} - writer.AddWriter(bufferD) + writer.Add(bufferD) writer.Write([]byte("yo")) writer.Write([]byte("ink")) if strings.Contains(bufferB.String(), "yoink") { @@ -114,24 +114,24 @@ func (d devNullCloser) Write(buf []byte) (int, error) { } // This test checks for races. It is only useful when run with the race detector. -func TestRaceBroadcastWriter(t *testing.T) { - writer := New() +func TestRaceUnbuffered(t *testing.T) { + writer := new(Unbuffered) c := make(chan bool) go func() { - writer.AddWriter(devNullCloser(0)) + writer.Add(devNullCloser(0)) c <- true }() writer.Write([]byte("hello")) <-c } -func BenchmarkBroadcastWriter(b *testing.B) { - writer := New() +func BenchmarkUnbuffered(b *testing.B) { + writer := new(Unbuffered) setUpWriter := func() { for i := 0; i < 100; i++ { - writer.AddWriter(devNullCloser(0)) - writer.AddWriter(devNullCloser(0)) - writer.AddWriter(devNullCloser(0)) + writer.Add(devNullCloser(0)) + writer.Add(devNullCloser(0)) + writer.Add(devNullCloser(0)) } } testLine := "Line that thinks that it is log line from docker"