From f7fc21bc5d30172f23026e51d13c97b0fe6a7b09 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 1 Apr 2013 23:52:20 -0700 Subject: [PATCH 1/7] Refactored CmdRun and CmdAttach to use Container.Attach Upstream-commit: c808940c041ca8c7f9f9c5e0fa93e37ebfdd412a Component: engine --- components/engine/commands.go | 110 +++++---------------------------- components/engine/container.go | 81 ++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 93 deletions(-) diff --git a/components/engine/commands.go b/components/engine/commands.go index 0c896f29ba..ee481fd282 100644 --- a/components/engine/commands.go +++ b/components/engine/commands.go @@ -13,7 +13,6 @@ import ( "runtime" "strconv" "strings" - "sync" "text/tabwriter" "time" "unicode" @@ -533,6 +532,7 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string return nil } + // FIXME: CmdPull should be a wrapper around Runtime.Pull() if srv.runtime.graph.LookupRemoteImage(remote, srv.runtime.authConfig) { if err := srv.runtime.graph.PullImage(stdout, remote, srv.runtime.authConfig); err != nil { return err @@ -796,56 +796,7 @@ func (srv *Server) CmdAttach(stdin io.ReadCloser, stdout io.Writer, args ...stri if container == nil { return fmt.Errorf("No such container: %s", name) } - - cStdout, err := container.StdoutPipe() - if err != nil { - return err - } - cStderr, err := container.StderrPipe() - if err != nil { - return err - } - - var wg sync.WaitGroup - if container.Config.OpenStdin { - cStdin, err := container.StdinPipe() - if err != nil { - return err - } - wg.Add(1) - go func() { - Debugf("Begin stdin pipe [attach]") - io.Copy(cStdin, stdin) - - // When stdin get closed, it means the client has been detached - // Make sure all pipes are closed. - if err := cStdout.Close(); err != nil { - Debugf("Error closing stdin pipe: %s", err) - } - if err := cStderr.Close(); err != nil { - Debugf("Error closing stderr pipe: %s", err) - } - - wg.Add(-1) - Debugf("End of stdin pipe [attach]") - }() - } - wg.Add(1) - go func() { - Debugf("Begin stdout pipe [attach]") - io.Copy(stdout, cStdout) - wg.Add(-1) - Debugf("End of stdout pipe [attach]") - }() - wg.Add(1) - go func() { - Debugf("Begin stderr pipe [attach]") - io.Copy(stdout, cStderr) - wg.Add(-1) - Debugf("End of stderr pipe [attach]") - }() - wg.Wait() - return nil + return <-container.Attach(stdin, stdout, stdout) } // Ports type - Used to parse multiple -p flags @@ -919,55 +870,28 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) return err } } - if config.OpenStdin { - cmdStdin, err := container.StdinPipe() - if err != nil { - return err - } - if !config.Detach { - Go(func() error { - _, err := io.Copy(cmdStdin, stdin) - cmdStdin.Close() - return err - }) - } - } // Run the container if !config.Detach { - cmdStderr, err := container.StderrPipe() - if err != nil { - return err - } - cmdStdout, err := container.StdoutPipe() - if err != nil { - return err + var attachErr chan error + if config.OpenStdin { + config.StdinOnce = true + Debugf("Attaching with stdin\n") + attachErr = container.Attach(stdin, stdout, stdout) + } else { + Debugf("Attaching without stdin\n") + attachErr = container.Attach(nil, stdout, nil) } + Debugf("Starting\n") if err := container.Start(); err != nil { return err } - sendingStdout := Go(func() error { - _, err := io.Copy(stdout, cmdStdout) - return err - }) - sendingStderr := Go(func() error { - _, err := io.Copy(stdout, cmdStderr) - return err - }) - errSendingStdout := <-sendingStdout - errSendingStderr := <-sendingStderr - if errSendingStdout != nil { - return errSendingStdout - } - if errSendingStderr != nil { - return errSendingStderr - } - container.Wait() - } else { - if err := container.Start(); err != nil { - return err - } - fmt.Fprintln(stdout, container.ShortId()) + Debugf("Waiting for attach to return\n") + return <-attachErr } + if err := container.Start(); err != nil { + return err + } + fmt.Fprintln(stdout, container.ShortId()) return nil } diff --git a/components/engine/container.go b/components/engine/container.go index 8f993a35e1..4f320f32fd 100644 --- a/components/engine/container.go +++ b/components/engine/container.go @@ -56,6 +56,7 @@ type Config struct { Ports []int Tty bool // Attach standard streams to a tty, including stdin if it is not closed. OpenStdin bool // Open stdin + StdinOnce bool // If true, close stdin after the 1 attached client disconnects. Env []string Cmd []string Image string // Name of the image as it was passed by the operator (eg. could be symbolic) @@ -229,6 +230,86 @@ func (container *Container) start() error { return container.cmd.Start() } +func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.Writer) chan error { + var cStdout io.ReadCloser + var cStderr io.ReadCloser + var nJobs int + errors := make(chan error, 3) + if stdin != nil && container.Config.OpenStdin { + nJobs += 1 + if cStdin, err := container.StdinPipe(); err != nil { + errors <- err + } else { + go func() { + Debugf("[start] attach stdin\n") + defer Debugf("[end] attach stdin\n") + if container.Config.StdinOnce { + defer cStdin.Close() + } + _, err := io.Copy(cStdin, stdin) + if err != nil { + Debugf("[error] attach stdout: %s\n", err) + } + errors <- err + }() + } + } + if stdout != nil { + nJobs += 1 + if p, err := container.StdoutPipe(); err != nil { + errors <- err + } else { + cStdout = p + go func() { + Debugf("[start] attach stdout\n") + defer Debugf("[end] attach stdout\n") + _, err := io.Copy(stdout, cStdout) + if err != nil { + Debugf("[error] attach stdout: %s\n", err) + } + errors <- err + }() + } + } + if stderr != nil { + nJobs += 1 + if p, err := container.StderrPipe(); err != nil { + errors <- err + } else { + cStderr = p + go func() { + Debugf("[start] attach stderr\n") + defer Debugf("[end] attach stderr\n") + _, err := io.Copy(stderr, cStderr) + if err != nil { + Debugf("[error] attach stderr: %s\n", err) + } + errors <- err + }() + } + } + return Go(func() error { + if cStdout != nil { + defer cStdout.Close() + } + if cStderr != nil { + defer cStderr.Close() + } + // FIXME: how do clean up the stdin goroutine without the unwanted side effect + // of closing the passed stdin? Add an intermediary io.Pipe? + for i := 0; i < nJobs; i += 1 { + Debugf("Waiting for job %d/%d\n", i+1, nJobs) + if err := <-errors; err != nil { + Debugf("Job %d returned error %s. Aborting all jobs\n", i+1, err) + return err + } + Debugf("Job %d completed successfully\n", i+1) + } + Debugf("All jobs completed successfully\n") + return nil + }) +} + func (container *Container) Start() error { if container.State.Running { return fmt.Errorf("The container %s is already running.", container.Id) From 138633b4abbb6c6c7b6dba0599398634174c2648 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 11:00:14 -0700 Subject: [PATCH 2/7] [unit tests] Cleanly kill all containers before nuking a temporary runtime Upstream-commit: 8f9e4542417421ee62ac952d1e04268520a75d8f Component: engine --- components/engine/runtime_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/components/engine/runtime_test.go b/components/engine/runtime_test.go index b8a6c429c9..d8e160fc0e 100644 --- a/components/engine/runtime_test.go +++ b/components/engine/runtime_test.go @@ -6,6 +6,7 @@ import ( "os" "os/exec" "os/user" + "sync" "testing" ) @@ -17,6 +18,15 @@ var unitTestStoreBase string var srv *Server func nuke(runtime *Runtime) error { + var wg sync.WaitGroup + for _, container := range runtime.List() { + wg.Add(1) + go func() { + container.Kill() + wg.Add(-1) + }() + } + wg.Wait() return os.RemoveAll(runtime.root) } From cd9062641b0d4ada12038f80e415d0ba2eb5912d Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 11:02:19 -0700 Subject: [PATCH 3/7] Activate Config.StdinOnce at argument parsing Upstream-commit: aea2675f7b2e4aa8bf63c8472af40bc059bd5d14 Component: engine --- components/engine/commands.go | 1 - components/engine/container.go | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/components/engine/commands.go b/components/engine/commands.go index ee481fd282..299ae0e4f5 100644 --- a/components/engine/commands.go +++ b/components/engine/commands.go @@ -874,7 +874,6 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) if !config.Detach { var attachErr chan error if config.OpenStdin { - config.StdinOnce = true Debugf("Attaching with stdin\n") attachErr = container.Attach(stdin, stdout, stdout) } else { diff --git a/components/engine/container.go b/components/engine/container.go index 4f320f32fd..a11539180e 100644 --- a/components/engine/container.go +++ b/components/engine/container.go @@ -101,6 +101,10 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) { Cmd: runCmd, Image: image, } + // When allocating stdin in attached mode, close stdin at client disconnect + if config.OpenStdin && !config.Detach { + config.StdinOnce = true + } return config, nil } From 8a9e7f42f0910bb73518ddd5bc00ebf249b30aec Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 11:06:42 -0700 Subject: [PATCH 4/7] When simulating disconnects in the tests, make sure that the command returns - but don't check for a specific return value Upstream-commit: 1cc1cb099ed8fb0e4fc47debe38a33d17e211a65 Component: engine --- components/engine/commands_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/components/engine/commands_test.go b/components/engine/commands_test.go index ebbe88c637..27b6605abb 100644 --- a/components/engine/commands_test.go +++ b/components/engine/commands_test.go @@ -71,9 +71,9 @@ func TestRunDisconnect(t *testing.T) { stdout, stdoutPipe := io.Pipe() c1 := make(chan struct{}) go func() { - if err := srv.CmdRun(stdin, stdoutPipe, "-i", GetTestImage(runtime).Id, "/bin/cat"); err != nil { - t.Fatal(err) - } + // We're simulating a disconnect so the return value doesn't matter. What matters is the + // fact that CmdRun returns. + srv.CmdRun(stdin, stdoutPipe, "-i", GetTestImage(runtime).Id, "/bin/cat") close(c1) }() @@ -135,9 +135,9 @@ func TestAttachDisconnect(t *testing.T) { // Attach to it c1 := make(chan struct{}) go func() { - if err := srv.CmdAttach(stdin, stdoutPipe, container.Id); err != nil { - t.Fatal(err) - } + // We're simulating a disconnect so the return value doesn't matter. What matters is the + // fact that CmdAttach returns. + srv.CmdAttach(stdin, stdoutPipe, container.Id) close(c1) }() From 5597e432afcd433db2365acf66d2ef9e29a82bfa Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 11:07:49 -0700 Subject: [PATCH 5/7] 'docker run' in attached mode no longer waits for the process to exit. Take this into account in the tests. Upstream-commit: 2db358146f8bc0f63d840c8e0227bcbe7a8e3063 Component: engine --- components/engine/commands_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/components/engine/commands_test.go b/components/engine/commands_test.go index 27b6605abb..734cefa278 100644 --- a/components/engine/commands_test.go +++ b/components/engine/commands_test.go @@ -94,11 +94,15 @@ func TestRunDisconnect(t *testing.T) { <-c1 }) - // Check the status of the container - container := runtime.containers.Back().Value.(*Container) - if container.State.Running { - t.Fatalf("/bin/cat is still running after closing stdin") - } + // Client disconnect after run -i should cause stdin to be closed, which should + // cause /bin/cat to exit. + setTimeout(t, "Waiting for /bin/cat to exit timed out", 2*time.Second, func() { + container := runtime.List()[0] + container.Wait() + if container.State.Running { + t.Fatalf("/bin/cat is still running after closing stdin") + } + }) } // Expected behaviour, the process stays alive when the client disconnects From 506ecddd217e2f4af02cf4dbba633540867b0c96 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 18:05:19 -0700 Subject: [PATCH 6/7] Add test for attaching only stdin at run with 'docker run -i -a=stdin' Upstream-commit: c77063afcd00f97c6922c41e509090dd4f8c2b95 Component: engine --- components/engine/commands_test.go | 58 ++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/components/engine/commands_test.go b/components/engine/commands_test.go index 734cefa278..90744b13e7 100644 --- a/components/engine/commands_test.go +++ b/components/engine/commands_test.go @@ -2,8 +2,11 @@ package docker import ( "bufio" + "bytes" "fmt" "io" + "io/ioutil" + "log" "strings" "testing" "time" @@ -105,6 +108,61 @@ func TestRunDisconnect(t *testing.T) { }) } +// TestAttachStdin checks attaching to stdin without stdout and stderr. +// 'docker run -i -a stdin' should sends the client's stdin to the command, +// then detach from it and print the container id. +func TestAttachStdin(t *testing.T) { + runtime, err := newTestRuntime() + if err != nil { + t.Fatal(err) + } + defer nuke(runtime) + srv := &Server{runtime: runtime} + + stdinR, stdinW := io.Pipe() + var stdout bytes.Buffer + + ch := make(chan struct{}) + go func() { + srv.CmdRun(stdinR, &stdout, "-i", "-a", "stdin", GetTestImage(runtime).Id, "sh", "-c", "echo hello; cat") + close(ch) + }() + + // Send input to the command, close stdin, wait for CmdRun to return + setTimeout(t, "Read/Write timed out", 2*time.Second, func() { + if _, err := stdinW.Write([]byte("hi there\n")); err != nil { + t.Fatal(err) + } + stdinW.Close() + <-ch + }) + + // Check output + cmdOutput := string(stdout.Bytes()) + container := runtime.List()[0] + if cmdOutput != container.ShortId()+"\n" { + t.Fatalf("Wrong output: should be '%s', not '%s'\n", container.ShortId()+"\n", cmdOutput) + } + + setTimeout(t, "Waiting for command to exit timed out", 2*time.Second, func() { + container.Wait() + }) + + // Check logs + if cmdLogs, err := container.ReadLog("stdout"); err != nil { + t.Fatal(err) + } else { + if output, err := ioutil.ReadAll(cmdLogs); err != nil { + t.Fatal(err) + } else { + expectedLog := "hello\nhi there\n" + if string(output) != expectedLog { + t.Fatalf("Unexpected logs: should be '%s', not '%s'\n", expectedLog, output) + } + } + } +} + // Expected behaviour, the process stays alive when the client disconnects func TestAttachDisconnect(t *testing.T) { runtime, err := newTestRuntime() From 7777dd857e3b2f87b0c58b449bcb73b7710e3467 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 2 Apr 2013 18:07:16 -0700 Subject: [PATCH 7/7] docker run [-a [stdin|stdout|stderr] [...]]: choose which streams to attach to when running a command. Fixes #234. Upstream-commit: c04af2a330991fcd52bcce213bcb863cff95d378 Component: engine --- components/engine/commands.go | 64 ++++++++++++++++++++++++---------- components/engine/container.go | 63 +++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 40 deletions(-) diff --git a/components/engine/commands.go b/components/engine/commands.go index 299ae0e4f5..328f3b99a8 100644 --- a/components/engine/commands.go +++ b/components/engine/commands.go @@ -827,6 +827,33 @@ func (opts *ListOpts) Set(value string) error { return nil } +// AttachOpts stores arguments to 'docker run -a', eg. which streams to attach to +type AttachOpts map[string]bool + +func NewAttachOpts() *AttachOpts { + opts := make(map[string]bool) + return (*AttachOpts)(&opts) +} + +func (opts *AttachOpts) String() string { + return fmt.Sprint(*opts) +} + +func (opts *AttachOpts) Set(val string) error { + if val != "stdin" && val != "stdout" && val != "stderr" { + return fmt.Errorf("Unsupported stream name: %s", val) + } + (*opts)[val] = true + return nil +} + +func (opts *AttachOpts) Get(val string) bool { + if res, exists := (*opts)[val]; exists { + return res + } + return false +} + func (srv *Server) CmdTag(stdin io.ReadCloser, stdout io.Writer, args ...string) error { cmd := rcli.Subcmd(stdout, "tag", "[OPTIONS] IMAGE REPOSITORY [TAG]", "Tag an image into a repository") force := cmd.Bool("f", false, "Force") @@ -870,28 +897,29 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) return err } } - // Run the container - if !config.Detach { - var attachErr chan error - if config.OpenStdin { - Debugf("Attaching with stdin\n") - attachErr = container.Attach(stdin, stdout, stdout) - } else { - Debugf("Attaching without stdin\n") - attachErr = container.Attach(nil, stdout, nil) - } - Debugf("Starting\n") - if err := container.Start(); err != nil { - return err - } - Debugf("Waiting for attach to return\n") - return <-attachErr + var ( + cStdin io.Reader + cStdout, cStderr io.Writer + ) + if config.AttachStdin { + cStdin = stdin } + if config.AttachStdout { + cStdout = stdout + } + if config.AttachStderr { + cStderr = stdout // FIXME: rcli can't differentiate stdout from stderr + } + attachErr := container.Attach(cStdin, cStdout, cStderr) + Debugf("Starting\n") if err := container.Start(); err != nil { return err } - fmt.Fprintln(stdout, container.ShortId()) - return nil + if cStdout == nil && cStderr == nil { + fmt.Fprintln(stdout, container.ShortId()) + } + Debugf("Waiting for attach to return\n") + return <-attachErr } func NewServer() (*Server, error) { diff --git a/components/engine/container.go b/components/engine/container.go index a11539180e..c2b21e0c32 100644 --- a/components/engine/container.go +++ b/components/engine/container.go @@ -48,18 +48,20 @@ type Container struct { } type Config struct { - Hostname string - User string - Memory int64 // Memory limit (in bytes) - MemorySwap int64 // Total memory usage (memory + swap); set `-1' to disable swap - Detach bool - Ports []int - Tty bool // Attach standard streams to a tty, including stdin if it is not closed. - OpenStdin bool // Open stdin - StdinOnce bool // If true, close stdin after the 1 attached client disconnects. - Env []string - Cmd []string - Image string // Name of the image as it was passed by the operator (eg. could be symbolic) + Hostname string + User string + Memory int64 // Memory limit (in bytes) + MemorySwap int64 // Total memory usage (memory + swap); set `-1' to disable swap + AttachStdin bool + AttachStdout bool + AttachStderr bool + Ports []int + Tty bool // Attach standard streams to a tty, including stdin if it is not closed. + OpenStdin bool // Open stdin + StdinOnce bool // If true, close stdin after the 1 attached client disconnects. + Env []string + Cmd []string + Image string // Name of the image as it was passed by the operator (eg. could be symbolic) } func ParseRun(args []string, stdout io.Writer) (*Config, error) { @@ -70,6 +72,8 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) { flUser := cmd.String("u", "", "Username or UID") flDetach := cmd.Bool("d", false, "Detached mode: leave the container running in the background") + flAttach := NewAttachOpts() + cmd.Var(flAttach, "a", "Attach to stdin, stdout or stderr.") flStdin := cmd.Bool("i", false, "Keep stdin open even if not attached") flTty := cmd.Bool("t", false, "Allocate a pseudo-tty") flMemory := cmd.Int64("m", 0, "Memory limit (in bytes)") @@ -81,6 +85,19 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) { if err := cmd.Parse(args); err != nil { return nil, err } + if *flDetach && len(*flAttach) > 0 { + return nil, fmt.Errorf("Conflicting options: -a and -d") + } + // If neither -d or -a are set, attach to everything by default + if len(*flAttach) == 0 && !*flDetach { + if !*flDetach { + flAttach.Set("stdout") + flAttach.Set("stderr") + if *flStdin { + flAttach.Set("stdin") + } + } + } parsedArgs := cmd.Args() runCmd := []string{} image := "" @@ -91,18 +108,20 @@ func ParseRun(args []string, stdout io.Writer) (*Config, error) { runCmd = parsedArgs[1:] } config := &Config{ - Ports: flPorts, - User: *flUser, - Tty: *flTty, - OpenStdin: *flStdin, - Memory: *flMemory, - Detach: *flDetach, - Env: flEnv, - Cmd: runCmd, - Image: image, + Ports: flPorts, + User: *flUser, + Tty: *flTty, + OpenStdin: *flStdin, + Memory: *flMemory, + AttachStdin: flAttach.Get("stdin"), + AttachStdout: flAttach.Get("stdout"), + AttachStderr: flAttach.Get("stderr"), + Env: flEnv, + Cmd: runCmd, + Image: image, } // When allocating stdin in attached mode, close stdin at client disconnect - if config.OpenStdin && !config.Detach { + if config.OpenStdin && config.AttachStdin { config.StdinOnce = true } return config, nil