Windows: new hcsshim stdin/out/err handling

Signed-off-by: John Howard <jhoward@microsoft.com>
Upstream-commit: ec5a73d18eb6a5241cdaa0b5d473abb085d6f491
Component: engine
This commit is contained in:
John Starks
2015-07-18 17:15:02 -07:00
committed by John Howard
parent 683400e786
commit 84c4707817
16 changed files with 214 additions and 1385 deletions

View File

@ -8,22 +8,16 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/execdriver"
"github.com/docker/docker/pkg/stringid"
"github.com/microsoft/hcsshim"
"github.com/natefinch/npipe"
)
// Exec implements the exec driver Driver interface.
func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) {
var (
inListen, outListen, errListen *npipe.PipeListener
term execdriver.Terminal
err error
randomID = stringid.GenerateNonCryptoID()
serverPipeFormat, clientPipeFormat string
pid uint32
exitCode int32
term execdriver.Terminal
err error
exitCode int32
)
active := d.activeContainers[c.ID]
@ -39,64 +33,6 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
// Configure the environment for the process // Note NOT c.ProcessConfig.Tty
createProcessParms.Environment = setupEnvironmentVariables(processConfig.Env)
// We use another unique ID here for each exec instance otherwise it
// may conflict with the pipe name being used by RUN.
// We use a different pipe name between real and dummy mode in the HCS
if dummyMode {
clientPipeFormat = `\\.\pipe\docker-exec-%[1]s-%[2]s-%[3]s`
serverPipeFormat = clientPipeFormat
} else {
clientPipeFormat = `\\.\pipe\docker-exec-%[2]s-%[3]s`
serverPipeFormat = `\\.\Containers\%[1]s\Device\NamedPipe\docker-exec-%[2]s-%[3]s`
}
// Connect stdin
if pipes.Stdin != nil {
stdInPipe := fmt.Sprintf(serverPipeFormat, c.ID, randomID, "stdin")
createProcessParms.StdInPipe = fmt.Sprintf(clientPipeFormat, c.ID, randomID, "stdin")
// Listen on the named pipe
inListen, err = npipe.Listen(stdInPipe)
if err != nil {
logrus.Errorf("stdin failed to listen on %s %s ", stdInPipe, err)
return -1, err
}
defer inListen.Close()
// Launch a goroutine to do the accept. We do this so that we can
// cause an otherwise blocking goroutine to gracefully close when
// the caller (us) closes the listener
go stdinAccept(inListen, stdInPipe, pipes.Stdin)
}
// Connect stdout
stdOutPipe := fmt.Sprintf(serverPipeFormat, c.ID, randomID, "stdout")
createProcessParms.StdOutPipe = fmt.Sprintf(clientPipeFormat, c.ID, randomID, "stdout")
outListen, err = npipe.Listen(stdOutPipe)
if err != nil {
logrus.Errorf("stdout failed to listen on %s %s", stdOutPipe, err)
return -1, err
}
defer outListen.Close()
go stdouterrAccept(outListen, stdOutPipe, pipes.Stdout)
// No stderr on TTY. Note NOT c.ProcessConfig.Tty
if !processConfig.Tty {
// Connect stderr
stdErrPipe := fmt.Sprintf(serverPipeFormat, c.ID, randomID, "stderr")
createProcessParms.StdErrPipe = fmt.Sprintf(clientPipeFormat, c.ID, randomID, "stderr")
errListen, err = npipe.Listen(stdErrPipe)
if err != nil {
logrus.Errorf("Stderr failed to listen on %s %s", stdErrPipe, err)
return -1, err
}
defer errListen.Close()
go stdouterrAccept(errListen, stdErrPipe, pipes.Stderr)
}
// While this should get caught earlier, just in case, validate that we
// have something to run.
if processConfig.Entrypoint == "" {
@ -114,13 +50,16 @@ func (d *Driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
logrus.Debugln("commandLine: ", createProcessParms.CommandLine)
// Start the command running in the container.
pid, err = hcsshim.CreateProcessInComputeSystem(c.ID, createProcessParms)
pid, stdin, stdout, stderr, err := hcsshim.CreateProcessInComputeSystem(c.ID, pipes.Stdin != nil, true, !processConfig.Tty, createProcessParms)
if err != nil {
logrus.Errorf("CreateProcessInComputeSystem() failed %s", err)
return -1, err
}
// Now that the process has been launched, begin copying data to and from
// the named pipes for the std handles.
setupPipes(stdin, stdout, stderr, pipes)
// Note NOT c.ProcessConfig.Tty
if processConfig.Tty {
term = NewTtyConsole(c.ID, pid)

View File

@ -6,77 +6,49 @@ import (
"io"
"github.com/Sirupsen/logrus"
"github.com/natefinch/npipe"
"github.com/docker/docker/daemon/execdriver"
)
// stdinAccept runs as a go function. It waits for the container system
// to accept our offer of a named pipe for stdin. Once accepted, if we are
// running "attached" to the container (eg docker run -i), then we spin up
// a further thread to copy anything from the client into the container.
//
// Important design note. This function is run as a go function for a very
// good reason. The named pipe Accept call is blocking until one of two things
// happen. Either someone connects to it, or it is forcibly closed. Let's
// assume that no-one connects to it, the only way otherwise the Run()
// method would continue is by closing it. However, as that would be the same
// thread, it can't close it. Hence we run as another thread allowing Run()
// to close the named pipe.
func stdinAccept(inListen *npipe.PipeListener, pipeName string, copyfrom io.ReadCloser) {
// General comment. Handling I/O for a container is very different to Linux.
// We use a named pipe to HCS to copy I/O both in and out of the container,
// very similar to how docker daemon communicates with a CLI.
// Wait for the pipe to be connected to by the shim
logrus.Debugln("stdinAccept: Waiting on ", pipeName)
stdinConn, err := inListen.Accept()
if err != nil {
logrus.Errorf("Failed to accept on pipe %s %s", pipeName, err)
return
}
logrus.Debugln("Connected to ", stdinConn.RemoteAddr())
// startStdinCopy asynchronously copies an io.Reader to the container's
// process's stdin pipe and closes the pipe when there is no more data to copy.
func startStdinCopy(dst io.WriteCloser, src io.Reader) {
// Anything that comes from the client stdin should be copied
// across to the stdin named pipe of the container.
if copyfrom != nil {
go func() {
defer stdinConn.Close()
logrus.Debugln("Calling io.Copy on stdin")
bytes, err := io.Copy(stdinConn, copyfrom)
logrus.Debugf("Finished io.Copy on stdin bytes=%d err=%s pipe=%s", bytes, err, stdinConn.RemoteAddr())
}()
} else {
defer stdinConn.Close()
}
go func() {
defer dst.Close()
bytes, err := io.Copy(dst, src)
logrus.Debugf("Copied %d bytes from stdin err=%s", bytes, err)
}()
}
// stdouterrAccept runs as a go function. It waits for the container system to
// accept our offer of a named pipe - in fact two of them - one for stdout
// and one for stderr (we are called twice). Once the named pipe is accepted,
// if we are running "attached" to the container (eg docker run -i), then we
// spin up a further thread to copy anything from the containers output channels
// to the client.
func stdouterrAccept(outerrListen *npipe.PipeListener, pipeName string, copyto io.Writer) {
// Wait for the pipe to be connected to by the shim
logrus.Debugln("out/err: Waiting on ", pipeName)
outerrConn, err := outerrListen.Accept()
if err != nil {
logrus.Errorf("Failed to accept on pipe %s %s", pipeName, err)
return
}
logrus.Debugln("Connected to ", outerrConn.RemoteAddr())
// startStdouterrCopy asynchronously copies data from the container's process's
// stdout or stderr pipe to an io.Writer and closes the pipe when there is no
// more data to copy.
func startStdouterrCopy(dst io.Writer, src io.ReadCloser, name string) {
// Anything that comes from the container named pipe stdout/err should be copied
// across to the stdout/err of the client
if copyto != nil {
go func() {
defer outerrConn.Close()
logrus.Debugln("Calling io.Copy on ", pipeName)
bytes, err := io.Copy(copyto, outerrConn)
logrus.Debugf("Copied %d bytes from pipe=%s", bytes, outerrConn.RemoteAddr())
if err != nil {
// Not fatal, just debug log it
logrus.Debugf("Error hit during copy %s", err)
}
}()
} else {
defer outerrConn.Close()
go func() {
defer src.Close()
bytes, err := io.Copy(dst, src)
logrus.Debugf("Copied %d bytes from %s err=%s", bytes, name, err)
}()
}
// setupPipes starts the asynchronous copying of data to and from the named
// pipes used byt he HCS for the std handles.
func setupPipes(stdin io.WriteCloser, stdout, stderr io.ReadCloser, pipes *execdriver.Pipes) {
if stdin != nil {
startStdinCopy(stdin, pipes.Stdin)
}
if stdout != nil {
startStdouterrCopy(pipes.Stdout, stdout, "stdout")
}
if stderr != nil {
startStdouterrCopy(pipes.Stderr, stderr, "stderr")
}
}

View File

@ -15,7 +15,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/execdriver"
"github.com/microsoft/hcsshim"
"github.com/natefinch/npipe"
)
// defaultContainerNAT is the default name of the container NAT device that is
@ -60,23 +59,28 @@ type device struct {
}
type containerInit struct {
SystemType string
Name string
IsDummy bool
VolumePath string
Devices []device
IgnoreFlushesDuringBoot bool
LayerFolderPath string
Layers []layer
SystemType string // HCS requires this to be hard-coded to "Container"
Name string // Name of the container. We use the docker ID.
Owner string // The management platform that created this container
IsDummy bool // Used for development purposes.
VolumePath string // Windows volume path for scratch space
Devices []device // Devices used by the container
IgnoreFlushesDuringBoot bool // Optimisation hint for container startup in Windows
LayerFolderPath string // Where the layer folders are located
Layers []layer // List of storage layers
}
// defaultOwner is a tag passed to HCS to allow it to differentiate between
// container creator management stacks. We hard code "docker" in the case
// of docker.
const defaultOwner = "docker"
// Run implements the exec driver Driver interface
func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (execdriver.ExitStatus, error) {
var (
term execdriver.Terminal
err error
inListen, outListen, errListen *npipe.PipeListener
term execdriver.Terminal
err error
)
// Make sure the client isn't asking for options which aren't supported
@ -88,6 +92,7 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallba
cu := &containerInit{
SystemType: "Container",
Name: c.ID,
Owner: defaultOwner,
IsDummy: dummyMode,
VolumePath: c.Rootfs,
IgnoreFlushesDuringBoot: c.FirstStart,
@ -224,16 +229,6 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallba
}
}()
// We use a different pipe name between real and dummy mode in the HCS
var serverPipeFormat, clientPipeFormat string
if dummyMode {
clientPipeFormat = `\\.\pipe\docker-run-%[1]s-%[2]s`
serverPipeFormat = clientPipeFormat
} else {
clientPipeFormat = `\\.\pipe\docker-run-%[2]s`
serverPipeFormat = `\\.\Containers\%[1]s\Device\NamedPipe\docker-run-%[2]s`
}
createProcessParms := hcsshim.CreateProcessParams{
EmulateConsole: c.ProcessConfig.Tty,
WorkingDirectory: c.WorkingDir,
@ -243,51 +238,6 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallba
// Configure the environment for the process
createProcessParms.Environment = setupEnvironmentVariables(c.ProcessConfig.Env)
// Connect stdin
if pipes.Stdin != nil {
stdInPipe := fmt.Sprintf(serverPipeFormat, c.ID, "stdin")
createProcessParms.StdInPipe = fmt.Sprintf(clientPipeFormat, c.ID, "stdin")
// Listen on the named pipe
inListen, err = npipe.Listen(stdInPipe)
if err != nil {
logrus.Errorf("stdin failed to listen on %s err=%s", stdInPipe, err)
return execdriver.ExitStatus{ExitCode: -1}, err
}
defer inListen.Close()
// Launch a goroutine to do the accept. We do this so that we can
// cause an otherwise blocking goroutine to gracefully close when
// the caller (us) closes the listener
go stdinAccept(inListen, stdInPipe, pipes.Stdin)
}
// Connect stdout
stdOutPipe := fmt.Sprintf(serverPipeFormat, c.ID, "stdout")
createProcessParms.StdOutPipe = fmt.Sprintf(clientPipeFormat, c.ID, "stdout")
outListen, err = npipe.Listen(stdOutPipe)
if err != nil {
logrus.Errorf("stdout failed to listen on %s err=%s", stdOutPipe, err)
return execdriver.ExitStatus{ExitCode: -1}, err
}
defer outListen.Close()
go stdouterrAccept(outListen, stdOutPipe, pipes.Stdout)
// No stderr on TTY.
if !c.ProcessConfig.Tty {
// Connect stderr
stdErrPipe := fmt.Sprintf(serverPipeFormat, c.ID, "stderr")
createProcessParms.StdErrPipe = fmt.Sprintf(clientPipeFormat, c.ID, "stderr")
errListen, err = npipe.Listen(stdErrPipe)
if err != nil {
logrus.Errorf("stderr failed to listen on %s err=%s", stdErrPipe, err)
return execdriver.ExitStatus{ExitCode: -1}, err
}
defer errListen.Close()
go stdouterrAccept(errListen, stdErrPipe, pipes.Stderr)
}
// This should get caught earlier, but just in case - validate that we
// have something to run
if c.ProcessConfig.Entrypoint == "" {
@ -305,14 +255,16 @@ func (d *Driver) Run(c *execdriver.Command, pipes *execdriver.Pipes, startCallba
logrus.Debugf("CommandLine: %s", createProcessParms.CommandLine)
// Start the command running in the container.
var pid uint32
pid, err = hcsshim.CreateProcessInComputeSystem(c.ID, createProcessParms)
pid, stdin, stdout, stderr, err := hcsshim.CreateProcessInComputeSystem(c.ID, pipes.Stdin != nil, true, !c.ProcessConfig.Tty, createProcessParms)
if err != nil {
logrus.Errorf("CreateProcessInComputeSystem() failed %s", err)
return execdriver.ExitStatus{ExitCode: -1}, err
}
// Now that the process has been launched, begin copying data to and from
// the named pipes for the std handles.
setupPipes(stdin, stdout, stderr, pipes)
//Save the PID as we'll need this in Kill()
logrus.Debugf("PID %d", pid)
c.ContainerPid = int(pid)