Revert "fix: drop copy/pasta, keep timeouts"
This reverts commit a170e26e27b373a73616bc4671c8c074644031a0. Attempting to add more nuanced timeout logic.
This commit is contained in:
parent
c76601c9ce
commit
fc2deda1f6
@ -6,7 +6,6 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/docker/cli/cli/connhelper"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -59,11 +58,3 @@ func New(contextName string) (*client.Client, error) {
|
||||
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
func newConnectionHelper(daemonURL string) *connhelper.ConnectionHelper {
|
||||
helper, err := connhelper.GetConnectionHelper(daemonURL)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
return helper
|
||||
}
|
||||
|
299
pkg/client/commandconn/commandconn.go
Normal file
299
pkg/client/commandconn/commandconn.go
Normal file
@ -0,0 +1,299 @@
|
||||
// Package commandconn provides a net.Conn implementation that can be used for
|
||||
// proxying (or emulating) stream via a custom command.
|
||||
//
|
||||
// For example, to provide an http.Client that can connect to a Docker daemon
|
||||
// running in a Docker container ("DIND"):
|
||||
//
|
||||
// httpClient := &http.Client{
|
||||
// Transport: &http.Transport{
|
||||
// DialContext: func(ctx context.Context, _network, _addr string) (net.Conn, error) {
|
||||
// return commandconn.New(ctx, "docker", "exec", "-it", containerID, "docker", "system", "dial-stdio")
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
|
||||
package commandconn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
)
|
||||
|
||||
func setPdeathsig(cmd *exec.Cmd) {
|
||||
cmd.SysProcAttr.Pdeathsig = syscall.SIGKILL
|
||||
}
|
||||
|
||||
func createSession(cmd *exec.Cmd) {
|
||||
// for supporting ssh connection helper with ProxyCommand
|
||||
// https://github.com/docker/cli/issues/1707
|
||||
cmd.SysProcAttr.Setsid = true
|
||||
}
|
||||
|
||||
// New returns net.Conn
|
||||
func New(ctx context.Context, cmd string, args ...string) (net.Conn, error) {
|
||||
var (
|
||||
c commandConn
|
||||
err error
|
||||
)
|
||||
c.cmd = exec.CommandContext(ctx, cmd, args...)
|
||||
// we assume that args never contains sensitive information
|
||||
logrus.Debugf("commandconn: starting %s with %v", cmd, args)
|
||||
c.cmd.Env = os.Environ()
|
||||
c.cmd.SysProcAttr = &syscall.SysProcAttr{}
|
||||
setPdeathsig(c.cmd)
|
||||
createSession(c.cmd)
|
||||
c.stdin, err = c.cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.stdout, err = c.cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.cmd.Stderr = &stderrWriter{
|
||||
stderrMu: &c.stderrMu,
|
||||
stderr: &c.stderr,
|
||||
debugPrefix: fmt.Sprintf("commandconn (%s):", cmd),
|
||||
}
|
||||
c.localAddr = dummyAddr{network: "dummy", s: "dummy-0"}
|
||||
c.remoteAddr = dummyAddr{network: "dummy", s: "dummy-1"}
|
||||
return &c, c.cmd.Start()
|
||||
}
|
||||
|
||||
// commandConn implements net.Conn
|
||||
type commandConn struct {
|
||||
cmd *exec.Cmd
|
||||
cmdExited bool
|
||||
cmdWaitErr error
|
||||
cmdMutex sync.Mutex
|
||||
stdin io.WriteCloser
|
||||
stdout io.ReadCloser
|
||||
stderrMu sync.Mutex
|
||||
stderr bytes.Buffer
|
||||
stdioClosedMu sync.Mutex // for stdinClosed and stdoutClosed
|
||||
stdinClosed bool
|
||||
stdoutClosed bool
|
||||
localAddr net.Addr
|
||||
remoteAddr net.Addr
|
||||
}
|
||||
|
||||
// killIfStdioClosed kills the cmd if both stdin and stdout are closed.
|
||||
func (c *commandConn) killIfStdioClosed() error {
|
||||
c.stdioClosedMu.Lock()
|
||||
stdioClosed := c.stdoutClosed && c.stdinClosed
|
||||
c.stdioClosedMu.Unlock()
|
||||
if !stdioClosed {
|
||||
return nil
|
||||
}
|
||||
return c.kill()
|
||||
}
|
||||
|
||||
// killAndWait tries sending SIGTERM to the process before sending SIGKILL.
|
||||
func killAndWait(cmd *exec.Cmd) error {
|
||||
var werr error
|
||||
if runtime.GOOS != "windows" {
|
||||
werrCh := make(chan error)
|
||||
go func() { werrCh <- cmd.Wait() }()
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
select {
|
||||
case werr = <-werrCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
cmd.Process.Kill()
|
||||
werr = <-werrCh
|
||||
}
|
||||
} else {
|
||||
cmd.Process.Kill()
|
||||
werr = cmd.Wait()
|
||||
}
|
||||
return werr
|
||||
}
|
||||
|
||||
// kill returns nil if the command terminated, regardless to the exit status.
|
||||
func (c *commandConn) kill() error {
|
||||
var werr error
|
||||
c.cmdMutex.Lock()
|
||||
if c.cmdExited {
|
||||
werr = c.cmdWaitErr
|
||||
} else {
|
||||
werr = killAndWait(c.cmd)
|
||||
c.cmdWaitErr = werr
|
||||
c.cmdExited = true
|
||||
}
|
||||
c.cmdMutex.Unlock()
|
||||
if werr == nil {
|
||||
return nil
|
||||
}
|
||||
wExitErr, ok := werr.(*exec.ExitError)
|
||||
if ok {
|
||||
if wExitErr.ProcessState.Exited() {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.Wrapf(werr, "commandconn: failed to wait")
|
||||
}
|
||||
|
||||
func (c *commandConn) onEOF(eof error) error {
|
||||
// when we got EOF, the command is going to be terminated
|
||||
var werr error
|
||||
c.cmdMutex.Lock()
|
||||
if c.cmdExited {
|
||||
werr = c.cmdWaitErr
|
||||
} else {
|
||||
werrCh := make(chan error)
|
||||
go func() { werrCh <- c.cmd.Wait() }()
|
||||
select {
|
||||
case werr = <-werrCh:
|
||||
c.cmdWaitErr = werr
|
||||
c.cmdExited = true
|
||||
case <-time.After(10 * time.Second):
|
||||
c.cmdMutex.Unlock()
|
||||
c.stderrMu.Lock()
|
||||
stderr := c.stderr.String()
|
||||
c.stderrMu.Unlock()
|
||||
return errors.Errorf("command %v did not exit after %v: stderr=%q", c.cmd.Args, eof, stderr)
|
||||
}
|
||||
}
|
||||
c.cmdMutex.Unlock()
|
||||
if werr == nil {
|
||||
return eof
|
||||
}
|
||||
c.stderrMu.Lock()
|
||||
stderr := c.stderr.String()
|
||||
c.stderrMu.Unlock()
|
||||
return errors.Errorf("command %v has exited with %v, please make sure the URL is valid, and Docker 18.09 or later is installed on the remote host: stderr=%s", c.cmd.Args, werr, stderr)
|
||||
}
|
||||
|
||||
func ignorableCloseError(err error) bool {
|
||||
errS := err.Error()
|
||||
ss := []string{
|
||||
os.ErrClosed.Error(),
|
||||
}
|
||||
for _, s := range ss {
|
||||
if strings.Contains(errS, s) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *commandConn) CloseRead() error {
|
||||
// NOTE: maybe already closed here
|
||||
if err := c.stdout.Close(); err != nil && !ignorableCloseError(err) {
|
||||
// TODO: muted because https://github.com/docker/compose/issues/8544
|
||||
// logrus.Warnf("commandConn.CloseRead: %v", err)
|
||||
}
|
||||
c.stdioClosedMu.Lock()
|
||||
c.stdoutClosed = true
|
||||
c.stdioClosedMu.Unlock()
|
||||
if err := c.killIfStdioClosed(); err != nil {
|
||||
// TODO: muted because https://github.com/docker/compose/issues/8544
|
||||
// logrus.Warnf("commandConn.CloseRead: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandConn) Read(p []byte) (int, error) {
|
||||
n, err := c.stdout.Read(p)
|
||||
if err == io.EOF {
|
||||
err = c.onEOF(err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *commandConn) CloseWrite() error {
|
||||
// NOTE: maybe already closed here
|
||||
if err := c.stdin.Close(); err != nil && !ignorableCloseError(err) {
|
||||
// TODO: muted because https://github.com/docker/compose/issues/8544
|
||||
// logrus.Warnf("commandConn.CloseWrite: %v", err)
|
||||
}
|
||||
c.stdioClosedMu.Lock()
|
||||
c.stdinClosed = true
|
||||
c.stdioClosedMu.Unlock()
|
||||
if err := c.killIfStdioClosed(); err != nil {
|
||||
// TODO: muted because https://github.com/docker/compose/issues/8544
|
||||
// logrus.Warnf("commandConn.CloseWrite: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandConn) Write(p []byte) (int, error) {
|
||||
n, err := c.stdin.Write(p)
|
||||
if err == io.EOF {
|
||||
err = c.onEOF(err)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *commandConn) Close() error {
|
||||
var err error
|
||||
if err = c.CloseRead(); err != nil {
|
||||
logrus.Warnf("commandConn.Close: CloseRead: %v", err)
|
||||
}
|
||||
if err = c.CloseWrite(); err != nil {
|
||||
// TODO: muted because https://github.com/docker/compose/issues/8544
|
||||
// logrus.Warnf("commandConn.Close: CloseWrite: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *commandConn) LocalAddr() net.Addr {
|
||||
return c.localAddr
|
||||
}
|
||||
func (c *commandConn) RemoteAddr() net.Addr {
|
||||
return c.remoteAddr
|
||||
}
|
||||
func (c *commandConn) SetDeadline(t time.Time) error {
|
||||
logrus.Debugf("unimplemented call: SetDeadline(%v)", t)
|
||||
return nil
|
||||
}
|
||||
func (c *commandConn) SetReadDeadline(t time.Time) error {
|
||||
logrus.Debugf("unimplemented call: SetReadDeadline(%v)", t)
|
||||
return nil
|
||||
}
|
||||
func (c *commandConn) SetWriteDeadline(t time.Time) error {
|
||||
logrus.Debugf("unimplemented call: SetWriteDeadline(%v)", t)
|
||||
return nil
|
||||
}
|
||||
|
||||
type dummyAddr struct {
|
||||
network string
|
||||
s string
|
||||
}
|
||||
|
||||
func (d dummyAddr) Network() string {
|
||||
return d.network
|
||||
}
|
||||
|
||||
func (d dummyAddr) String() string {
|
||||
return d.s
|
||||
}
|
||||
|
||||
type stderrWriter struct {
|
||||
stderrMu *sync.Mutex
|
||||
stderr *bytes.Buffer
|
||||
debugPrefix string
|
||||
}
|
||||
|
||||
func (w *stderrWriter) Write(p []byte) (int, error) {
|
||||
logrus.Debugf("%s%s", w.debugPrefix, string(p))
|
||||
w.stderrMu.Lock()
|
||||
if w.stderr.Len() > 4096 {
|
||||
w.stderr.Reset()
|
||||
}
|
||||
n, err := w.stderr.Write(p)
|
||||
w.stderrMu.Unlock()
|
||||
return n, err
|
||||
}
|
83
pkg/client/connection.go
Normal file
83
pkg/client/connection.go
Normal file
@ -0,0 +1,83 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/url"
|
||||
|
||||
commandconnPkg "coopcloud.tech/abra/pkg/client/commandconn"
|
||||
"github.com/docker/cli/cli/connhelper"
|
||||
"github.com/docker/cli/cli/connhelper/ssh"
|
||||
"github.com/docker/cli/cli/context/docker"
|
||||
dCliContextStore "github.com/docker/cli/cli/context/store"
|
||||
dClient "github.com/docker/docker/client"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// GetConnectionHelper returns Docker-specific connection helper for the given URL.
|
||||
// GetConnectionHelper returns nil without error when no helper is registered for the scheme.
|
||||
//
|
||||
// ssh://<user>@<host> URL requires Docker 18.09 or later on the remote host.
|
||||
func GetConnectionHelper(daemonURL string) (*connhelper.ConnectionHelper, error) {
|
||||
return getConnectionHelper(daemonURL, nil)
|
||||
}
|
||||
|
||||
func getConnectionHelper(daemonURL string, sshFlags []string) (*connhelper.ConnectionHelper, error) {
|
||||
u, err := url.Parse(daemonURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch scheme := u.Scheme; scheme {
|
||||
case "ssh":
|
||||
sp, err := ssh.ParseURL(daemonURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "ssh host connection is not valid")
|
||||
}
|
||||
return &connhelper.ConnectionHelper{
|
||||
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return commandconnPkg.New(ctx, "ssh", append(sshFlags, sp.Args("docker", "system", "dial-stdio")...)...)
|
||||
},
|
||||
Host: "http://docker.example.com",
|
||||
}, nil
|
||||
}
|
||||
// Future version may support plugins via ~/.docker/config.json. e.g. "dind"
|
||||
// See docker/cli#889 for the previous discussion.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func newConnectionHelper(daemonURL string) *connhelper.ConnectionHelper {
|
||||
helper, err := GetConnectionHelper(daemonURL)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
return helper
|
||||
}
|
||||
|
||||
func getDockerEndpoint(host string) (docker.Endpoint, error) {
|
||||
skipTLSVerify := false
|
||||
ep := docker.Endpoint{
|
||||
EndpointMeta: docker.EndpointMeta{
|
||||
Host: host,
|
||||
SkipTLSVerify: skipTLSVerify,
|
||||
},
|
||||
}
|
||||
// try to resolve a docker client, validating the configuration
|
||||
opts, err := ep.ClientOpts()
|
||||
if err != nil {
|
||||
return docker.Endpoint{}, err
|
||||
}
|
||||
if _, err := dClient.NewClientWithOpts(opts...); err != nil {
|
||||
return docker.Endpoint{}, err
|
||||
}
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
func getDockerEndpointMetadataAndTLS(host string) (docker.EndpointMeta, *dCliContextStore.EndpointTLSData, error) {
|
||||
ep, err := getDockerEndpoint(host)
|
||||
if err != nil {
|
||||
return docker.EndpointMeta{}, nil, err
|
||||
}
|
||||
return ep.EndpointMeta, ep.TLSData.ToStoreTLSData(), nil
|
||||
}
|
@ -9,9 +9,7 @@ import (
|
||||
context "github.com/docker/cli/cli/context"
|
||||
"github.com/docker/cli/cli/context/docker"
|
||||
contextStore "github.com/docker/cli/cli/context/store"
|
||||
dCliContextStore "github.com/docker/cli/cli/context/store"
|
||||
cliflags "github.com/docker/cli/cli/flags"
|
||||
dClient "github.com/docker/docker/client"
|
||||
"github.com/moby/term"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -127,30 +125,3 @@ func NewDefaultDockerContextStore() *command.ContextStoreWithDefault {
|
||||
|
||||
return dockerContextStore
|
||||
}
|
||||
|
||||
func getDockerEndpointMetadataAndTLS(host string) (docker.EndpointMeta, *dCliContextStore.EndpointTLSData, error) {
|
||||
ep, err := getDockerEndpoint(host)
|
||||
if err != nil {
|
||||
return docker.EndpointMeta{}, nil, err
|
||||
}
|
||||
return ep.EndpointMeta, ep.TLSData.ToStoreTLSData(), nil
|
||||
}
|
||||
|
||||
func getDockerEndpoint(host string) (docker.Endpoint, error) {
|
||||
skipTLSVerify := false
|
||||
ep := docker.Endpoint{
|
||||
EndpointMeta: docker.EndpointMeta{
|
||||
Host: host,
|
||||
SkipTLSVerify: skipTLSVerify,
|
||||
},
|
||||
}
|
||||
// try to resolve a docker client, validating the configuration
|
||||
opts, err := ep.ClientOpts()
|
||||
if err != nil {
|
||||
return docker.Endpoint{}, err
|
||||
}
|
||||
if _, err := dClient.NewClientWithOpts(opts...); err != nil {
|
||||
return docker.Endpoint{}, err
|
||||
}
|
||||
return ep, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user