diff --git a/components/engine/Dockerfile b/components/engine/Dockerfile index c428761033..bc61bafb9b 100644 --- a/components/engine/Dockerfile +++ b/components/engine/Dockerfile @@ -62,6 +62,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/components/engine/Dockerfile.aarch64 b/components/engine/Dockerfile.aarch64 index 4302a5e1a4..e7f422d3ca 100644 --- a/components/engine/Dockerfile.aarch64 +++ b/components/engine/Dockerfile.aarch64 @@ -52,6 +52,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/components/engine/Dockerfile.armhf b/components/engine/Dockerfile.armhf index f489a6cde2..6764649637 100644 --- a/components/engine/Dockerfile.armhf +++ b/components/engine/Dockerfile.armhf @@ -45,6 +45,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.e2e b/components/engine/Dockerfile.e2e index 4dec3515a7..6c96c033ab 100644 --- a/components/engine/Dockerfile.e2e +++ b/components/engine/Dockerfile.e2e @@ -47,6 +47,7 @@ RUN apk add --update \ g++ \ git \ iptables \ + pigz \ tar \ xz \ && rm -rf /var/cache/apk/* diff --git a/components/engine/Dockerfile.ppc64le b/components/engine/Dockerfile.ppc64le index 106a3a6f37..15ee5f6d7a 100644 --- a/components/engine/Dockerfile.ppc64le +++ b/components/engine/Dockerfile.ppc64le @@ -46,6 +46,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.s390x b/components/engine/Dockerfile.s390x index e2f81a5d7d..57b7784420 100644 --- a/components/engine/Dockerfile.s390x +++ b/components/engine/Dockerfile.s390x @@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.simple b/components/engine/Dockerfile.simple index d048d909cd..578bbb2196 100644 --- a/components/engine/Dockerfile.simple +++ b/components/engine/Dockerfile.simple @@ -28,6 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ e2fsprogs \ iptables \ pkg-config \ + pigz \ procps \ xfsprogs \ xz-utils \ diff --git a/components/engine/daemon/container_operations.go b/components/engine/daemon/container_operations.go index 1a1c7c4b3f..3900e95e8d 100644 --- a/components/engine/daemon/container_operations.go +++ b/components/engine/daemon/container_operations.go @@ -966,6 +966,9 @@ func (daemon *Daemon) releaseNetwork(container *container.Container) { logrus.Warnf("error locating sandbox id %s: %v", sid, err) return } + if err := sb.DisableService(); err != nil { + logrus.WithFields(logrus.Fields{"container": container.ID, "sandbox": sid}).WithError(err).Error("Error removing service from sandbox") + } if err := sb.Delete(); err != nil { logrus.Errorf("Error deleting sandbox id %s for container %s: %v", sid, container.ID, err) diff --git a/components/engine/daemon/graphdriver/lcow/remotefs_file.go b/components/engine/daemon/graphdriver/lcow/remotefs_file.go index c13431973c..138cea3c78 100644 --- a/components/engine/daemon/graphdriver/lcow/remotefs_file.go +++ b/components/engine/daemon/graphdriver/lcow/remotefs_file.go @@ -33,7 +33,7 @@ func (l *lcowfs) OpenFile(path string, flag int, perm os.FileMode) (_ driver.Fil flagStr := strconv.FormatInt(int64(flag), 10) permStr := strconv.FormatUint(uint64(perm), 8) - commandLine := fmt.Sprintf("%s %s %s %s", remotefs.RemotefsCmd, remotefs.OpenFileCmd, flagStr, permStr) + commandLine := fmt.Sprintf("%s %s %s %s %s", remotefs.RemotefsCmd, remotefs.OpenFileCmd, path, flagStr, permStr) env := make(map[string]string) env["PATH"] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:" processConfig := &hcsshim.ProcessConfig{ diff --git a/components/engine/pkg/archive/archive.go b/components/engine/pkg/archive/archive.go index a0c25937c8..5f7f562677 100644 --- a/components/engine/pkg/archive/archive.go +++ b/components/engine/pkg/archive/archive.go @@ -6,6 +6,7 @@ import ( "bytes" "compress/bzip2" "compress/gzip" + "context" "fmt" "io" "io/ioutil" @@ -13,6 +14,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "syscall" @@ -24,6 +26,17 @@ import ( "github.com/sirupsen/logrus" ) +var unpigzPath string + +func init() { + if path, err := exec.LookPath("unpigz"); err != nil { + logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library") + } else { + logrus.Debugf("Using unpigz binary found at path %s", path) + unpigzPath = path + } +} + type ( // Compression is the state represents if compressed or not. Compression int @@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression { return Uncompressed } -func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) { +func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) { args := []string{"xz", "-d", "-c", "-q"} - return cmdStream(exec.Command(args[0], args[1:]...), archive) + return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive) +} + +func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) { + if unpigzPath == "" { + return gzip.NewReader(buf) + } + + disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ") + if disablePigzEnv != "" { + if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil { + return nil, err + } else if disablePigz { + return gzip.NewReader(buf) + } + } + + return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf) +} + +func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { + return ioutils.NewReadCloserWrapper(readBuf, func() error { + cancel() + return readBuf.Close() + }) } // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. @@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { readBufWrapper := p.NewReadCloserWrapper(buf, buf) return readBufWrapper, nil case Gzip: - gzReader, err := gzip.NewReader(buf) + ctx, cancel := context.WithCancel(context.Background()) + + gzReader, err := gzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) - return readBufWrapper, nil + return wrapReadCloser(readBufWrapper, cancel), nil case Bzip2: bz2Reader := bzip2.NewReader(buf) readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) return readBufWrapper, nil case Xz: - xzReader, chdone, err := xzDecompress(buf) + ctx, cancel := context.WithCancel(context.Background()) + + xzReader, err := xzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) - return ioutils.NewReadCloserWrapper(readBufWrapper, func() error { - <-chdone - return readBufWrapper.Close() - }), nil + return wrapReadCloser(readBufWrapper, cancel), nil default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } @@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error { // cmdStream executes a command, and returns its stdout as a stream. // If the command fails to run or doesn't complete successfully, an error // will be returned, including anything written on stderr. -func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) { - chdone := make(chan struct{}) +func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { cmd.Stdin = input pipeR, pipeW := io.Pipe() cmd.Stdout = pipeW @@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, // Run the command and return the pipe if err := cmd.Start(); err != nil { - return nil, nil, err + return nil, err } // Copy stdout to the returned pipe @@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, } else { pipeW.Close() } - close(chdone) }() - return pipeR, chdone, nil + return pipeR, nil } // NewTempArchive reads the content of src into a temporary file, and returns the contents diff --git a/components/engine/pkg/archive/archive_test.go b/components/engine/pkg/archive/archive_test.go index 989557c53c..0e94f41c6d 100644 --- a/components/engine/pkg/archive/archive_test.go +++ b/components/engine/pkg/archive/archive_test.go @@ -3,6 +3,7 @@ package archive import ( "archive/tar" "bytes" + "compress/gzip" "fmt" "io" "io/ioutil" @@ -15,6 +16,7 @@ import ( "time" "github.com/docker/docker/pkg/idtools" + "github.com/docker/docker/pkg/ioutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) { } } -func testDecompressStream(t *testing.T, ext, compressCommand string) { +func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader { cmd := exec.Command("sh", "-c", fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand)) output, err := cmd.CombinedOutput() @@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) { if err = r.Close(); err != nil { t.Fatalf("Failed to close the decompressed stream: %v ", err) } + + return r } func TestDecompressStreamGzip(t *testing.T) { @@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) { func TestCmdStreamLargeStderr(t *testing.T) { cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) { t.Skip("Failing on Windows CI machines") } badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1") - out, _, err := cmdStream(badCmd, nil) + out, err := cmdStream(badCmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) { func TestCmdStreamGood(t *testing.T) { cmd := exec.Command("sh", "-c", "echo hello; exit 0") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatal(err) } @@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec assert.NoError(t, err) return string(content) } + +func TestDisablePigz(t *testing.T) { + _, err := exec.LookPath("unpigz") + if err != nil { + t.Log("Test will not check full path when Pigz not installed") + } + + os.Setenv("MOBY_DISABLE_PIGZ", "true") + defer os.Unsetenv("MOBY_DISABLE_PIGZ") + + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) +} + +func TestPigz(t *testing.T) { + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + _, err := exec.LookPath("unpigz") + if err == nil { + t.Log("Tested whether Pigz is used, as it installed") + assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader) + } else { + t.Log("Tested whether Pigz is not used, as it not installed") + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) + } +} diff --git a/components/engine/pkg/ioutils/readers.go b/components/engine/pkg/ioutils/readers.go index 63f3c07f46..168fa1d2d0 100644 --- a/components/engine/pkg/ioutils/readers.go +++ b/components/engine/pkg/ioutils/readers.go @@ -8,18 +8,22 @@ import ( "golang.org/x/net/context" ) -type readCloserWrapper struct { +// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser +// It calls the given callback function when closed. It should be constructed +// with NewReadCloserWrapper +type ReadCloserWrapper struct { io.Reader closer func() error } -func (r *readCloserWrapper) Close() error { +// Close calls back the passed closer function +func (r *ReadCloserWrapper) Close() error { return r.closer() } // NewReadCloserWrapper returns a new io.ReadCloser. func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { - return &readCloserWrapper{ + return &ReadCloserWrapper{ Reader: r, closer: closer, } diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index f52021dc99..b417be6951 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -31,7 +31,7 @@ github.com/moby/buildkit aaff9d591ef128560018433fe61beb802e149de8 github.com/tonistiigi/fsutil dea3a0da73aee887fc02142d995be764106ac5e2 #get libnetwork packages -github.com/docker/libnetwork a1dfea384b39779552a3b4837ea9303194950976 +github.com/docker/libnetwork 315a076a4e9ded2abc950318c71d5f1637547977 github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go index a2d1dbc4c6..1e1b6a1675 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go +++ b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go @@ -311,16 +311,25 @@ func (ep *endpoint) isAnonymous() bool { return ep.anonymous } -// enableService sets ep's serviceEnabled to the passed value if it's not in the -// current state and returns true; false otherwise. -func (ep *endpoint) enableService(state bool) bool { +// isServiceEnabled check if service is enabled on the endpoint +func (ep *endpoint) isServiceEnabled() bool { ep.Lock() defer ep.Unlock() - if ep.serviceEnabled != state { - ep.serviceEnabled = state - return true - } - return false + return ep.serviceEnabled +} + +// enableService sets service enabled on the endpoint +func (ep *endpoint) enableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = true +} + +// disableService disables service on the endpoint +func (ep *endpoint) disableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = false } func (ep *endpoint) needResolver() bool { @@ -759,10 +768,6 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) return err } - if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil { - logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e) - } - if e := ep.deleteDriverInfoFromCluster(); e != nil { logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e) } diff --git a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go index 315195ebb8..423066c128 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go +++ b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go @@ -674,24 +674,41 @@ func (sb *sandbox) SetKey(basePath string) error { return nil } -func (sb *sandbox) EnableService() error { +func (sb *sandbox) EnableService() (err error) { logrus.Debugf("EnableService %s START", sb.containerID) + defer func() { + if err != nil { + sb.DisableService() + } + }() for _, ep := range sb.getConnectedEndpoints() { - if ep.enableService(true) { + if !ep.isServiceEnabled() { if err := ep.addServiceInfoToCluster(sb); err != nil { - ep.enableService(false) return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err) } + ep.enableService() } } logrus.Debugf("EnableService %s DONE", sb.containerID) return nil } -func (sb *sandbox) DisableService() error { +func (sb *sandbox) DisableService() (err error) { logrus.Debugf("DisableService %s START", sb.containerID) + failedEps := []string{} + defer func() { + if len(failedEps) > 0 { + err = fmt.Errorf("failed to disable service on sandbox:%s, for endpoints %s", sb.ID(), strings.Join(failedEps, ",")) + } + }() for _, ep := range sb.getConnectedEndpoints() { - ep.enableService(false) + if ep.isServiceEnabled() { + if err := ep.deleteServiceInfoFromCluster(sb, "DisableService"); err != nil { + failedEps = append(failedEps, ep.Name()) + logrus.Warnf("failed update state for endpoint %s into cluster: %v", ep.Name(), err) + } + ep.disableService() + } } logrus.Debugf("DisableService %s DONE", sb.containerID) return nil