diff --git a/components/cli/command/container/run.go b/components/cli/command/container/run.go index d36ab610cf..2f1181659d 100644 --- a/components/cli/command/container/run.go +++ b/components/cli/command/container/run.go @@ -211,10 +211,7 @@ func runRun(dockerCli *command.DockerCli, flags *pflag.FlagSet, opts *runOptions }) } - statusChan, err := waitExitOrRemoved(dockerCli, context.Background(), createResponse.ID, hostConfig.AutoRemove) - if err != nil { - return fmt.Errorf("Error waiting container's exit code: %v", err) - } + statusChan := waitExitOrRemoved(dockerCli, ctx, createResponse.ID, hostConfig.AutoRemove) //start the container if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil { diff --git a/components/cli/command/container/start.go b/components/cli/command/container/start.go index 9f414a7c66..4c31f9bf97 100644 --- a/components/cli/command/container/start.go +++ b/components/cli/command/container/start.go @@ -108,7 +108,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { // 3. We should open a channel for receiving status code of the container // no matter it's detached, removed on daemon side(--rm) or exit normally. - statusChan, statusErr := waitExitOrRemoved(dockerCli, context.Background(), c.ID, c.HostConfig.AutoRemove) + statusChan := waitExitOrRemoved(dockerCli, ctx, c.ID, c.HostConfig.AutoRemove) startOptions := types.ContainerStartOptions{ CheckpointID: opts.checkpoint, } @@ -117,7 +117,7 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { if err := dockerCli.Client().ContainerStart(ctx, c.ID, startOptions); err != nil { cancelFun() <-cErr - if c.HostConfig.AutoRemove && statusErr == nil { + if c.HostConfig.AutoRemove { // wait container to be removed <-statusChan } @@ -134,10 +134,6 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { return attchErr } - if statusErr != nil { - return fmt.Errorf("can't get container's exit code: %v", statusErr) - } - if status := <-statusChan; status != 0 { return cli.StatusError{StatusCode: status} } diff --git a/components/cli/command/container/stats.go b/components/cli/command/container/stats.go index 2bd5e3db75..394302d087 100644 --- a/components/cli/command/container/stats.go +++ b/components/cli/command/container/stats.go @@ -63,24 +63,22 @@ func runStats(dockerCli *command.DockerCli, opts *statsOptions) error { options := types.EventsOptions{ Filters: f, } - resBody, err := dockerCli.Client().Events(ctx, options) - // Whether we successfully subscribed to events or not, we can now + + eventq, errq := dockerCli.Client().Events(ctx, options) + + // Whether we successfully subscribed to eventq or not, we can now // unblock the main goroutine. close(started) - if err != nil { - closeChan <- err - return - } - defer resBody.Close() - system.DecodeEvents(resBody, func(event events.Message, err error) error { - if err != nil { + for { + select { + case event := <-eventq: + c <- event + case err := <-errq: closeChan <- err - return nil + return } - c <- event - return nil - }) + } } // waitFirst is a WaitGroup to wait first stat data's reach for each container diff --git a/components/cli/command/container/utils.go b/components/cli/command/container/utils.go index 7e895834f9..9df1d115e2 100644 --- a/components/cli/command/container/utils.go +++ b/components/cli/command/container/utils.go @@ -1,7 +1,6 @@ package container import ( - "fmt" "strconv" "golang.org/x/net/context" @@ -11,11 +10,10 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/cli/command" - "github.com/docker/docker/cli/command/system" clientapi "github.com/docker/docker/client" ) -func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) (chan int, error) { +func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, containerID string, waitRemove bool) chan int { if len(containerID) == 0 { // containerID can never be empty panic("Internal Error: waitExitOrRemoved needs a containerID as parameter") @@ -24,11 +22,7 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai statusChan := make(chan int) exitCode := 125 - eventProcessor := func(e events.Message, err error) error { - if err != nil { - statusChan <- exitCode - return fmt.Errorf("failed to decode event: %v", err) - } + eventProcessor := func(e events.Message) bool { stopProcessing := false switch e.Status { @@ -53,11 +47,10 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai if stopProcessing { statusChan <- exitCode - // stop the loop processing - return fmt.Errorf("done") + return true } - return nil + return false } // Get events via Events API @@ -67,14 +60,29 @@ func waitExitOrRemoved(dockerCli *command.DockerCli, ctx context.Context, contai options := types.EventsOptions{ Filters: f, } - resBody, err := dockerCli.Client().Events(ctx, options) - if err != nil { - return nil, fmt.Errorf("can't get events from daemon: %v", err) - } - go system.DecodeEvents(resBody, eventProcessor) + eventCtx, cancel := context.WithCancel(ctx) + eventq, errq := dockerCli.Client().Events(eventCtx, options) - return statusChan, nil + go func() { + defer cancel() + + for { + select { + case evt := <-eventq: + if eventProcessor(evt) { + return + } + + case err := <-errq: + logrus.Errorf("error getting events from daemon: %v", err) + statusChan <- exitCode + return + } + } + }() + + return statusChan } // getExitCode performs an inspect on the container. It returns diff --git a/components/cli/command/system/events.go b/components/cli/command/system/events.go index f2946b8763..7b5fb592cb 100644 --- a/components/cli/command/system/events.go +++ b/components/cli/command/system/events.go @@ -63,13 +63,33 @@ func runEvents(dockerCli *command.DockerCli, opts *eventsOptions) error { Filters: opts.filter.Value(), } - responseBody, err := dockerCli.Client().Events(context.Background(), options) - if err != nil { - return err - } - defer responseBody.Close() + ctx, cancel := context.WithCancel(context.Background()) + events, errs := dockerCli.Client().Events(ctx, options) + defer cancel() - return streamEvents(dockerCli.Out(), responseBody, tmpl) + out := dockerCli.Out() + + for { + select { + case event := <-events: + if err := handleEvent(out, event, tmpl); err != nil { + return err + } + case err := <-errs: + if err == io.EOF { + return nil + } + return err + } + } +} + +func handleEvent(out io.Writer, event eventtypes.Message, tmpl *template.Template) error { + if tmpl == nil { + return prettyPrintEvent(out, event) + } + + return formatEvent(out, event, tmpl) } func makeTemplate(format string) (*template.Template, error) { @@ -85,21 +105,6 @@ func makeTemplate(format string) (*template.Template, error) { return tmpl, tmpl.Execute(ioutil.Discard, &eventtypes.Message{}) } -// streamEvents decodes prints the incoming events in the provided output. -func streamEvents(out io.Writer, input io.Reader, tmpl *template.Template) error { - return DecodeEvents(input, func(event eventtypes.Message, err error) error { - if err != nil { - return err - } - if tmpl == nil { - return prettyPrintEvent(out, event) - } - return formatEvent(out, event, tmpl) - }) -} - -type eventProcessor func(event eventtypes.Message, err error) error - // prettyPrintEvent prints all types of event information. // Each output includes the event type, actor id, name and action. // Actor attributes are printed at the end if the actor has any. diff --git a/components/cli/command/system/events_utils.go b/components/cli/command/system/events_utils.go index 71c1b0476b..b0dd909d15 100644 --- a/components/cli/command/system/events_utils.go +++ b/components/cli/command/system/events_utils.go @@ -1,14 +1,14 @@ package system import ( - "encoding/json" - "io" "sync" "github.com/Sirupsen/logrus" eventtypes "github.com/docker/docker/api/types/events" ) +type eventProcessor func(eventtypes.Message, error) error + // EventHandler is abstract interface for user to customize // own handle functions of each type of events type EventHandler interface { @@ -47,20 +47,3 @@ func (w *eventHandler) Watch(c <-chan eventtypes.Message) { go h(e) } } - -// DecodeEvents decodes event from input stream -func DecodeEvents(input io.Reader, ep eventProcessor) error { - dec := json.NewDecoder(input) - for { - var event eventtypes.Message - err := dec.Decode(&event) - if err != nil && err == io.EOF { - break - } - - if procErr := ep(event, err); procErr != nil { - return procErr - } - } - return nil -}