From d1e42752e2ddf8b0a416d04b93a011d2a2c13b9c Mon Sep 17 00:00:00 2001 From: decentral1se Date: Mon, 18 Oct 2021 10:48:43 +0200 Subject: [PATCH] fix: set connection timeouts + clean up bad contexts Closes https://git.coopcloud.tech/coop-cloud/organising/issues/205. --- cli/server/add.go | 8 + go.mod | 1 + pkg/client/client.go | 5 +- pkg/client/commandconn/commandconn.go | 299 ++++++++++++++++++++++++++ pkg/client/connection.go | 40 +++- 5 files changed, 351 insertions(+), 2 deletions(-) create mode 100644 pkg/client/commandconn/commandconn.go diff --git a/cli/server/add.go b/cli/server/add.go index e218b280..894afd0a 100644 --- a/cli/server/add.go +++ b/cli/server/add.go @@ -118,6 +118,10 @@ All communication between Abra and the server will use this SSH connection. ctx := context.Background() cl, err := client.New(domainName) if err != nil { + logrus.Warn("cleaning up context due to connection failure") + if err := client.DeleteContext(domainName); err != nil { + logrus.Fatal(err) + } logrus.Fatal(err) } @@ -125,6 +129,10 @@ All communication between Abra and the server will use this SSH connection. if strings.Contains(err.Error(), "command not found") { logrus.Fatalf("docker is not installed on '%s'?", domainName) } else { + logrus.Warn("cleaning up context due to connection failure") + if err := client.DeleteContext(domainName); err != nil { + logrus.Fatal(err) + } logrus.Fatalf("unable to make a connection to '%s'?", domainName) } logrus.Debug(err) diff --git a/go.mod b/go.mod index efc70bf3..58014a3d 100644 --- a/go.mod +++ b/go.mod @@ -37,4 +37,5 @@ require ( github.com/opencontainers/runc v1.0.2 // indirect github.com/theupdateframework/notary v0.7.0 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect + golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 ) diff --git a/pkg/client/client.go b/pkg/client/client.go index 777a3643..2c595569 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -4,6 +4,7 @@ package client import ( "net/http" "os" + "time" "github.com/docker/docker/client" "github.com/sirupsen/logrus" @@ -46,9 +47,11 @@ func New(contextName string) (*client.Client, error) { clientOpts = append(clientOpts, client.WithAPIVersionNegotiation()) } + clientOpts = append(clientOpts, client.WithTimeout(3*time.Second)) + cl, err := client.NewClientWithOpts(clientOpts...) if err != nil { - logrus.Fatalf("unable to create Docker client: %s", err) + return nil, err } logrus.Debugf("created client for '%s'", contextName) diff --git a/pkg/client/commandconn/commandconn.go b/pkg/client/commandconn/commandconn.go new file mode 100644 index 00000000..7fce5ff8 --- /dev/null +++ b/pkg/client/commandconn/commandconn.go @@ -0,0 +1,299 @@ +// Package commandconn provides a net.Conn implementation that can be used for +// proxying (or emulating) stream via a custom command. +// +// For example, to provide an http.Client that can connect to a Docker daemon +// running in a Docker container ("DIND"): +// +// httpClient := &http.Client{ +// Transport: &http.Transport{ +// DialContext: func(ctx context.Context, _network, _addr string) (net.Conn, error) { +// return commandconn.New(ctx, "docker", "exec", "-it", containerID, "docker", "system", "dial-stdio") +// }, +// }, +// } + +package commandconn + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "os" + "runtime" + "strings" + "sync" + "syscall" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + exec "golang.org/x/sys/execabs" +) + +func setPdeathsig(cmd *exec.Cmd) { + cmd.SysProcAttr.Pdeathsig = syscall.SIGKILL +} + +func createSession(cmd *exec.Cmd) { + // for supporting ssh connection helper with ProxyCommand + // https://github.com/docker/cli/issues/1707 + cmd.SysProcAttr.Setsid = true +} + +// New returns net.Conn +func New(ctx context.Context, cmd string, args ...string) (net.Conn, error) { + var ( + c commandConn + err error + ) + c.cmd = exec.CommandContext(ctx, cmd, args...) + // we assume that args never contains sensitive information + logrus.Debugf("commandconn: starting %s with %v", cmd, args) + c.cmd.Env = os.Environ() + c.cmd.SysProcAttr = &syscall.SysProcAttr{} + setPdeathsig(c.cmd) + createSession(c.cmd) + c.stdin, err = c.cmd.StdinPipe() + if err != nil { + return nil, err + } + c.stdout, err = c.cmd.StdoutPipe() + if err != nil { + return nil, err + } + c.cmd.Stderr = &stderrWriter{ + stderrMu: &c.stderrMu, + stderr: &c.stderr, + debugPrefix: fmt.Sprintf("commandconn (%s):", cmd), + } + c.localAddr = dummyAddr{network: "dummy", s: "dummy-0"} + c.remoteAddr = dummyAddr{network: "dummy", s: "dummy-1"} + return &c, c.cmd.Start() +} + +// commandConn implements net.Conn +type commandConn struct { + cmd *exec.Cmd + cmdExited bool + cmdWaitErr error + cmdMutex sync.Mutex + stdin io.WriteCloser + stdout io.ReadCloser + stderrMu sync.Mutex + stderr bytes.Buffer + stdioClosedMu sync.Mutex // for stdinClosed and stdoutClosed + stdinClosed bool + stdoutClosed bool + localAddr net.Addr + remoteAddr net.Addr +} + +// killIfStdioClosed kills the cmd if both stdin and stdout are closed. +func (c *commandConn) killIfStdioClosed() error { + c.stdioClosedMu.Lock() + stdioClosed := c.stdoutClosed && c.stdinClosed + c.stdioClosedMu.Unlock() + if !stdioClosed { + return nil + } + return c.kill() +} + +// killAndWait tries sending SIGTERM to the process before sending SIGKILL. +func killAndWait(cmd *exec.Cmd) error { + var werr error + if runtime.GOOS != "windows" { + werrCh := make(chan error) + go func() { werrCh <- cmd.Wait() }() + cmd.Process.Signal(syscall.SIGTERM) + select { + case werr = <-werrCh: + case <-time.After(3 * time.Second): + cmd.Process.Kill() + werr = <-werrCh + } + } else { + cmd.Process.Kill() + werr = cmd.Wait() + } + return werr +} + +// kill returns nil if the command terminated, regardless to the exit status. +func (c *commandConn) kill() error { + var werr error + c.cmdMutex.Lock() + if c.cmdExited { + werr = c.cmdWaitErr + } else { + werr = killAndWait(c.cmd) + c.cmdWaitErr = werr + c.cmdExited = true + } + c.cmdMutex.Unlock() + if werr == nil { + return nil + } + wExitErr, ok := werr.(*exec.ExitError) + if ok { + if wExitErr.ProcessState.Exited() { + return nil + } + } + return errors.Wrapf(werr, "commandconn: failed to wait") +} + +func (c *commandConn) onEOF(eof error) error { + // when we got EOF, the command is going to be terminated + var werr error + c.cmdMutex.Lock() + if c.cmdExited { + werr = c.cmdWaitErr + } else { + werrCh := make(chan error) + go func() { werrCh <- c.cmd.Wait() }() + select { + case werr = <-werrCh: + c.cmdWaitErr = werr + c.cmdExited = true + case <-time.After(10 * time.Second): + c.cmdMutex.Unlock() + c.stderrMu.Lock() + stderr := c.stderr.String() + c.stderrMu.Unlock() + return errors.Errorf("command %v did not exit after %v: stderr=%q", c.cmd.Args, eof, stderr) + } + } + c.cmdMutex.Unlock() + if werr == nil { + return eof + } + c.stderrMu.Lock() + stderr := c.stderr.String() + c.stderrMu.Unlock() + return errors.Errorf("command %v has exited with %v, please make sure the URL is valid, and Docker 18.09 or later is installed on the remote host: stderr=%s", c.cmd.Args, werr, stderr) +} + +func ignorableCloseError(err error) bool { + errS := err.Error() + ss := []string{ + os.ErrClosed.Error(), + } + for _, s := range ss { + if strings.Contains(errS, s) { + return true + } + } + return false +} + +func (c *commandConn) CloseRead() error { + // NOTE: maybe already closed here + if err := c.stdout.Close(); err != nil && !ignorableCloseError(err) { + // TODO: muted because https://github.com/docker/compose/issues/8544 + // logrus.Warnf("commandConn.CloseRead: %v", err) + } + c.stdioClosedMu.Lock() + c.stdoutClosed = true + c.stdioClosedMu.Unlock() + if err := c.killIfStdioClosed(); err != nil { + // TODO: muted because https://github.com/docker/compose/issues/8544 + // logrus.Warnf("commandConn.CloseRead: %v", err) + } + return nil +} + +func (c *commandConn) Read(p []byte) (int, error) { + n, err := c.stdout.Read(p) + if err == io.EOF { + err = c.onEOF(err) + } + return n, err +} + +func (c *commandConn) CloseWrite() error { + // NOTE: maybe already closed here + if err := c.stdin.Close(); err != nil && !ignorableCloseError(err) { + // TODO: muted because https://github.com/docker/compose/issues/8544 + // logrus.Warnf("commandConn.CloseWrite: %v", err) + } + c.stdioClosedMu.Lock() + c.stdinClosed = true + c.stdioClosedMu.Unlock() + if err := c.killIfStdioClosed(); err != nil { + // TODO: muted because https://github.com/docker/compose/issues/8544 + // logrus.Warnf("commandConn.CloseWrite: %v", err) + } + return nil +} + +func (c *commandConn) Write(p []byte) (int, error) { + n, err := c.stdin.Write(p) + if err == io.EOF { + err = c.onEOF(err) + } + return n, err +} + +func (c *commandConn) Close() error { + var err error + if err = c.CloseRead(); err != nil { + logrus.Warnf("commandConn.Close: CloseRead: %v", err) + } + if err = c.CloseWrite(); err != nil { + // TODO: muted because https://github.com/docker/compose/issues/8544 + // logrus.Warnf("commandConn.Close: CloseWrite: %v", err) + } + return err +} + +func (c *commandConn) LocalAddr() net.Addr { + return c.localAddr +} +func (c *commandConn) RemoteAddr() net.Addr { + return c.remoteAddr +} +func (c *commandConn) SetDeadline(t time.Time) error { + logrus.Debugf("unimplemented call: SetDeadline(%v)", t) + return nil +} +func (c *commandConn) SetReadDeadline(t time.Time) error { + logrus.Debugf("unimplemented call: SetReadDeadline(%v)", t) + return nil +} +func (c *commandConn) SetWriteDeadline(t time.Time) error { + logrus.Debugf("unimplemented call: SetWriteDeadline(%v)", t) + return nil +} + +type dummyAddr struct { + network string + s string +} + +func (d dummyAddr) Network() string { + return d.network +} + +func (d dummyAddr) String() string { + return d.s +} + +type stderrWriter struct { + stderrMu *sync.Mutex + stderr *bytes.Buffer + debugPrefix string +} + +func (w *stderrWriter) Write(p []byte) (int, error) { + logrus.Debugf("%s%s", w.debugPrefix, string(p)) + w.stderrMu.Lock() + if w.stderr.Len() > 4096 { + w.stderr.Reset() + } + n, err := w.stderr.Write(p) + w.stderrMu.Unlock() + return n, err +} diff --git a/pkg/client/connection.go b/pkg/client/connection.go index ec8020d4..078175f5 100644 --- a/pkg/client/connection.go +++ b/pkg/client/connection.go @@ -1,15 +1,53 @@ package client import ( + "context" + "net" + "net/url" + + commandconnPkg "coopcloud.tech/abra/pkg/client/commandconn" "github.com/docker/cli/cli/connhelper" + "github.com/docker/cli/cli/connhelper/ssh" "github.com/docker/cli/cli/context/docker" dCliContextStore "github.com/docker/cli/cli/context/store" dClient "github.com/docker/docker/client" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +// GetConnectionHelper returns Docker-specific connection helper for the given URL. +// GetConnectionHelper returns nil without error when no helper is registered for the scheme. +// +// ssh://@ URL requires Docker 18.09 or later on the remote host. +func GetConnectionHelper(daemonURL string) (*connhelper.ConnectionHelper, error) { + return getConnectionHelper(daemonURL, nil) +} + +func getConnectionHelper(daemonURL string, sshFlags []string) (*connhelper.ConnectionHelper, error) { + u, err := url.Parse(daemonURL) + if err != nil { + return nil, err + } + switch scheme := u.Scheme; scheme { + case "ssh": + sp, err := ssh.ParseURL(daemonURL) + if err != nil { + return nil, errors.Wrap(err, "ssh host connection is not valid") + } + return &connhelper.ConnectionHelper{ + Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) { + return commandconnPkg.New(ctx, "ssh", append(sshFlags, sp.Args("docker", "system", "dial-stdio")...)...) + }, + Host: daemonURL, + }, nil + } + // Future version may support plugins via ~/.docker/config.json. e.g. "dind" + // See docker/cli#889 for the previous discussion. + return nil, err +} + func newConnectionHelper(daemonURL string) *connhelper.ConnectionHelper { - helper, err := connhelper.GetConnectionHelper(daemonURL) + helper, err := GetConnectionHelper(daemonURL) if err != nil { logrus.Fatal(err) }