From e9c2f9fe7767f6893998ef723acbf9c866dbfa69 Mon Sep 17 00:00:00 2001 From: Sargun Dhillon Date: Tue, 16 Jan 2018 10:49:18 -0800 Subject: [PATCH 1/4] Make image (layer) downloads faster by using pigz The Golang built-in gzip library is serialized, and fairly slow at decompressing. It also only decompresses on demand, versus pipelining decompression. This change switches to using the pigz external command for gzip decompression, as opposed to using the built-in golang one. This code is not vendored, but will be used if it autodetected as part of the OS. This also switches to using context, versus a manually managed channel to manage cancellations, and synchronization. There is a little bit of weirdness around manually having to cancel in the error cases. Signed-off-by: Sargun Dhillon Upstream-commit: fd35494a251a497c359f706f61f33e689e2af678 Component: engine --- components/engine/Dockerfile | 1 + components/engine/Dockerfile.aarch64 | 1 + components/engine/Dockerfile.armhf | 1 + components/engine/Dockerfile.e2e | 1 + components/engine/Dockerfile.ppc64le | 1 + components/engine/Dockerfile.s390x | 1 + components/engine/Dockerfile.simple | 1 + components/engine/pkg/archive/archive.go | 66 +++++++++++++++---- components/engine/pkg/archive/archive_test.go | 47 +++++++++++-- components/engine/pkg/ioutils/readers.go | 10 ++- 10 files changed, 109 insertions(+), 21 deletions(-) diff --git a/components/engine/Dockerfile b/components/engine/Dockerfile index 2a8f9181e6..999a0f9671 100644 --- a/components/engine/Dockerfile +++ b/components/engine/Dockerfile @@ -62,6 +62,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/components/engine/Dockerfile.aarch64 b/components/engine/Dockerfile.aarch64 index fde0c706fb..32bb26504a 100644 --- a/components/engine/Dockerfile.aarch64 +++ b/components/engine/Dockerfile.aarch64 @@ -52,6 +52,7 @@ RUN apt-get update && apt-get install -y \ libudev-dev \ mercurial \ net-tools \ + pigz \ pkg-config \ protobuf-compiler \ protobuf-c-compiler \ diff --git a/components/engine/Dockerfile.armhf b/components/engine/Dockerfile.armhf index b5a4d938a4..85266f9390 100644 --- a/components/engine/Dockerfile.armhf +++ b/components/engine/Dockerfile.armhf @@ -45,6 +45,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.e2e b/components/engine/Dockerfile.e2e index 4dec3515a7..6c96c033ab 100644 --- a/components/engine/Dockerfile.e2e +++ b/components/engine/Dockerfile.e2e @@ -47,6 +47,7 @@ RUN apk add --update \ g++ \ git \ iptables \ + pigz \ tar \ xz \ && rm -rf /var/cache/apk/* diff --git a/components/engine/Dockerfile.ppc64le b/components/engine/Dockerfile.ppc64le index 106a3a6f37..15ee5f6d7a 100644 --- a/components/engine/Dockerfile.ppc64le +++ b/components/engine/Dockerfile.ppc64le @@ -46,6 +46,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.s390x b/components/engine/Dockerfile.s390x index e2f81a5d7d..57b7784420 100644 --- a/components/engine/Dockerfile.s390x +++ b/components/engine/Dockerfile.s390x @@ -42,6 +42,7 @@ RUN apt-get update && apt-get install -y \ libtool \ libudev-dev \ mercurial \ + pigz \ pkg-config \ python-backports.ssl-match-hostname \ python-dev \ diff --git a/components/engine/Dockerfile.simple b/components/engine/Dockerfile.simple index d048d909cd..578bbb2196 100644 --- a/components/engine/Dockerfile.simple +++ b/components/engine/Dockerfile.simple @@ -28,6 +28,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ e2fsprogs \ iptables \ pkg-config \ + pigz \ procps \ xfsprogs \ xz-utils \ diff --git a/components/engine/pkg/archive/archive.go b/components/engine/pkg/archive/archive.go index a0c25937c8..5f7f562677 100644 --- a/components/engine/pkg/archive/archive.go +++ b/components/engine/pkg/archive/archive.go @@ -6,6 +6,7 @@ import ( "bytes" "compress/bzip2" "compress/gzip" + "context" "fmt" "io" "io/ioutil" @@ -13,6 +14,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "syscall" @@ -24,6 +26,17 @@ import ( "github.com/sirupsen/logrus" ) +var unpigzPath string + +func init() { + if path, err := exec.LookPath("unpigz"); err != nil { + logrus.Debug("unpigz binary not found in PATH, falling back to go gzip library") + } else { + logrus.Debugf("Using unpigz binary found at path %s", path) + unpigzPath = path + } +} + type ( // Compression is the state represents if compressed or not. Compression int @@ -136,10 +149,34 @@ func DetectCompression(source []byte) Compression { return Uncompressed } -func xzDecompress(archive io.Reader) (io.ReadCloser, <-chan struct{}, error) { +func xzDecompress(ctx context.Context, archive io.Reader) (io.ReadCloser, error) { args := []string{"xz", "-d", "-c", "-q"} - return cmdStream(exec.Command(args[0], args[1:]...), archive) + return cmdStream(exec.CommandContext(ctx, args[0], args[1:]...), archive) +} + +func gzDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) { + if unpigzPath == "" { + return gzip.NewReader(buf) + } + + disablePigzEnv := os.Getenv("MOBY_DISABLE_PIGZ") + if disablePigzEnv != "" { + if disablePigz, err := strconv.ParseBool(disablePigzEnv); err != nil { + return nil, err + } else if disablePigz { + return gzip.NewReader(buf) + } + } + + return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf) +} + +func wrapReadCloser(readBuf io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { + return ioutils.NewReadCloserWrapper(readBuf, func() error { + cancel() + return readBuf.Close() + }) } // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. @@ -163,26 +200,29 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { readBufWrapper := p.NewReadCloserWrapper(buf, buf) return readBufWrapper, nil case Gzip: - gzReader, err := gzip.NewReader(buf) + ctx, cancel := context.WithCancel(context.Background()) + + gzReader, err := gzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, gzReader) - return readBufWrapper, nil + return wrapReadCloser(readBufWrapper, cancel), nil case Bzip2: bz2Reader := bzip2.NewReader(buf) readBufWrapper := p.NewReadCloserWrapper(buf, bz2Reader) return readBufWrapper, nil case Xz: - xzReader, chdone, err := xzDecompress(buf) + ctx, cancel := context.WithCancel(context.Background()) + + xzReader, err := xzDecompress(ctx, buf) if err != nil { + cancel() return nil, err } readBufWrapper := p.NewReadCloserWrapper(buf, xzReader) - return ioutils.NewReadCloserWrapper(readBufWrapper, func() error { - <-chdone - return readBufWrapper.Close() - }), nil + return wrapReadCloser(readBufWrapper, cancel), nil default: return nil, fmt.Errorf("Unsupported compression format %s", (&compression).Extension()) } @@ -1163,8 +1203,7 @@ func remapIDs(idMappings *idtools.IDMappings, hdr *tar.Header) error { // cmdStream executes a command, and returns its stdout as a stream. // If the command fails to run or doesn't complete successfully, an error // will be returned, including anything written on stderr. -func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, error) { - chdone := make(chan struct{}) +func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, error) { cmd.Stdin = input pipeR, pipeW := io.Pipe() cmd.Stdout = pipeW @@ -1173,7 +1212,7 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, // Run the command and return the pipe if err := cmd.Start(); err != nil { - return nil, nil, err + return nil, err } // Copy stdout to the returned pipe @@ -1183,10 +1222,9 @@ func cmdStream(cmd *exec.Cmd, input io.Reader) (io.ReadCloser, <-chan struct{}, } else { pipeW.Close() } - close(chdone) }() - return pipeR, chdone, nil + return pipeR, nil } // NewTempArchive reads the content of src into a temporary file, and returns the contents diff --git a/components/engine/pkg/archive/archive_test.go b/components/engine/pkg/archive/archive_test.go index 989557c53c..0e94f41c6d 100644 --- a/components/engine/pkg/archive/archive_test.go +++ b/components/engine/pkg/archive/archive_test.go @@ -3,6 +3,7 @@ package archive import ( "archive/tar" "bytes" + "compress/gzip" "fmt" "io" "io/ioutil" @@ -15,6 +16,7 @@ import ( "time" "github.com/docker/docker/pkg/idtools" + "github.com/docker/docker/pkg/ioutils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -87,7 +89,7 @@ func TestIsArchivePathTar(t *testing.T) { } } -func testDecompressStream(t *testing.T, ext, compressCommand string) { +func testDecompressStream(t *testing.T, ext, compressCommand string) io.Reader { cmd := exec.Command("sh", "-c", fmt.Sprintf("touch /tmp/archive && %s /tmp/archive", compressCommand)) output, err := cmd.CombinedOutput() @@ -111,6 +113,8 @@ func testDecompressStream(t *testing.T, ext, compressCommand string) { if err = r.Close(); err != nil { t.Fatalf("Failed to close the decompressed stream: %v ", err) } + + return r } func TestDecompressStreamGzip(t *testing.T) { @@ -206,7 +210,7 @@ func TestExtensionXz(t *testing.T) { func TestCmdStreamLargeStderr(t *testing.T) { cmd := exec.Command("sh", "-c", "dd if=/dev/zero bs=1k count=1000 of=/dev/stderr; echo hello") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -231,7 +235,7 @@ func TestCmdStreamBad(t *testing.T) { t.Skip("Failing on Windows CI machines") } badCmd := exec.Command("sh", "-c", "echo hello; echo >&2 error couldn\\'t reverse the phase pulser; exit 1") - out, _, err := cmdStream(badCmd, nil) + out, err := cmdStream(badCmd, nil) if err != nil { t.Fatalf("Failed to start command: %s", err) } @@ -246,7 +250,7 @@ func TestCmdStreamBad(t *testing.T) { func TestCmdStreamGood(t *testing.T) { cmd := exec.Command("sh", "-c", "echo hello; exit 0") - out, _, err := cmdStream(cmd, nil) + out, err := cmdStream(cmd, nil) if err != nil { t.Fatal(err) } @@ -1318,3 +1322,38 @@ func readFileFromArchive(t *testing.T, archive io.ReadCloser, name string, expec assert.NoError(t, err) return string(content) } + +func TestDisablePigz(t *testing.T) { + _, err := exec.LookPath("unpigz") + if err != nil { + t.Log("Test will not check full path when Pigz not installed") + } + + os.Setenv("MOBY_DISABLE_PIGZ", "true") + defer os.Unsetenv("MOBY_DISABLE_PIGZ") + + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) +} + +func TestPigz(t *testing.T) { + r := testDecompressStream(t, "gz", "gzip -f") + // For the bufio pool + outsideReaderCloserWrapper := r.(*ioutils.ReadCloserWrapper) + // For the context canceller + contextReaderCloserWrapper := outsideReaderCloserWrapper.Reader.(*ioutils.ReadCloserWrapper) + + _, err := exec.LookPath("unpigz") + if err == nil { + t.Log("Tested whether Pigz is used, as it installed") + assert.IsType(t, &io.PipeReader{}, contextReaderCloserWrapper.Reader) + } else { + t.Log("Tested whether Pigz is not used, as it not installed") + assert.IsType(t, &gzip.Reader{}, contextReaderCloserWrapper.Reader) + } +} diff --git a/components/engine/pkg/ioutils/readers.go b/components/engine/pkg/ioutils/readers.go index 63f3c07f46..168fa1d2d0 100644 --- a/components/engine/pkg/ioutils/readers.go +++ b/components/engine/pkg/ioutils/readers.go @@ -8,18 +8,22 @@ import ( "golang.org/x/net/context" ) -type readCloserWrapper struct { +// ReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser +// It calls the given callback function when closed. It should be constructed +// with NewReadCloserWrapper +type ReadCloserWrapper struct { io.Reader closer func() error } -func (r *readCloserWrapper) Close() error { +// Close calls back the passed closer function +func (r *ReadCloserWrapper) Close() error { return r.closer() } // NewReadCloserWrapper returns a new io.ReadCloser. func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { - return &readCloserWrapper{ + return &ReadCloserWrapper{ Reader: r, closer: closer, } From 5a20e1240ca29e681b089587aac8cfd1c754b4ca Mon Sep 17 00:00:00 2001 From: John Howard Date: Tue, 16 Jan 2018 11:31:29 -0800 Subject: [PATCH 2/4] LCOW: Fix OpenFile parameters Signed-off-by: John Howard Upstream-commit: 141b9a74716c016029badf16aca21dc96975aaac Component: engine --- components/engine/daemon/graphdriver/lcow/remotefs_file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/engine/daemon/graphdriver/lcow/remotefs_file.go b/components/engine/daemon/graphdriver/lcow/remotefs_file.go index c13431973c..138cea3c78 100644 --- a/components/engine/daemon/graphdriver/lcow/remotefs_file.go +++ b/components/engine/daemon/graphdriver/lcow/remotefs_file.go @@ -33,7 +33,7 @@ func (l *lcowfs) OpenFile(path string, flag int, perm os.FileMode) (_ driver.Fil flagStr := strconv.FormatInt(int64(flag), 10) permStr := strconv.FormatUint(uint64(perm), 8) - commandLine := fmt.Sprintf("%s %s %s %s", remotefs.RemotefsCmd, remotefs.OpenFileCmd, flagStr, permStr) + commandLine := fmt.Sprintf("%s %s %s %s %s", remotefs.RemotefsCmd, remotefs.OpenFileCmd, path, flagStr, permStr) env := make(map[string]string) env["PATH"] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:" processConfig := &hcsshim.ProcessConfig{ From 2012c45c5a8d6b12d3b11cbdd40e7a470b8c8678 Mon Sep 17 00:00:00 2001 From: Abhinandan Prativadi Date: Mon, 8 Jan 2018 14:25:50 -0800 Subject: [PATCH 3/4] Disable service on release network This PR contains a fix for moby/moby#30321. There was a moby/moby#31142 PR intending to fix the issue by adding a delay between disabling the service in the cluster and the shutdown of the tasks. However disabling the service was not deleting the service info in the cluster. Added a fix to delete service info from cluster and verified using siege to ensure there is zero downtime on rolling update of a service.In order to support it and ensure consitency of enabling and disable service knob from the daemon, we need to ensure we disable service when we release the network from the container. This helps in making the enable and disable service less racy. The corresponding part of libnetwork fix is part of docker/libnetwork#1824 Signed-off-by: abhi Upstream-commit: a042e5a20a7801efc936daf7a639487bb37ca966 Component: engine --- components/engine/daemon/container_operations.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/components/engine/daemon/container_operations.go b/components/engine/daemon/container_operations.go index 1a1c7c4b3f..3900e95e8d 100644 --- a/components/engine/daemon/container_operations.go +++ b/components/engine/daemon/container_operations.go @@ -966,6 +966,9 @@ func (daemon *Daemon) releaseNetwork(container *container.Container) { logrus.Warnf("error locating sandbox id %s: %v", sid, err) return } + if err := sb.DisableService(); err != nil { + logrus.WithFields(logrus.Fields{"container": container.ID, "sandbox": sid}).WithError(err).Error("Error removing service from sandbox") + } if err := sb.Delete(); err != nil { logrus.Errorf("Error deleting sandbox id %s for container %s: %v", sid, container.ID, err) From 2b31b5d6e4259d64cf1da1c7a78784c20896af66 Mon Sep 17 00:00:00 2001 From: Abhinandan Prativadi Date: Mon, 8 Jan 2018 15:10:34 -0800 Subject: [PATCH 4/4] libnetwork vendor Signed-off-by: abhi Upstream-commit: dad093cc34497bb7912c769e469a3e79a0c2c63c Component: engine --- components/engine/vendor.conf | 2 +- .../github.com/docker/libnetwork/endpoint.go | 29 +++++++++++-------- .../github.com/docker/libnetwork/sandbox.go | 27 +++++++++++++---- 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index 31630c0dba..855858f539 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -30,7 +30,7 @@ github.com/moby/buildkit aaff9d591ef128560018433fe61beb802e149de8 github.com/tonistiigi/fsutil dea3a0da73aee887fc02142d995be764106ac5e2 #get libnetwork packages -github.com/docker/libnetwork a1dfea384b39779552a3b4837ea9303194950976 +github.com/docker/libnetwork 315a076a4e9ded2abc950318c71d5f1637547977 github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go index a2d1dbc4c6..1e1b6a1675 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/endpoint.go +++ b/components/engine/vendor/github.com/docker/libnetwork/endpoint.go @@ -311,16 +311,25 @@ func (ep *endpoint) isAnonymous() bool { return ep.anonymous } -// enableService sets ep's serviceEnabled to the passed value if it's not in the -// current state and returns true; false otherwise. -func (ep *endpoint) enableService(state bool) bool { +// isServiceEnabled check if service is enabled on the endpoint +func (ep *endpoint) isServiceEnabled() bool { ep.Lock() defer ep.Unlock() - if ep.serviceEnabled != state { - ep.serviceEnabled = state - return true - } - return false + return ep.serviceEnabled +} + +// enableService sets service enabled on the endpoint +func (ep *endpoint) enableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = true +} + +// disableService disables service on the endpoint +func (ep *endpoint) disableService() { + ep.Lock() + defer ep.Unlock() + ep.serviceEnabled = false } func (ep *endpoint) needResolver() bool { @@ -759,10 +768,6 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption) return err } - if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil { - logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e) - } - if e := ep.deleteDriverInfoFromCluster(); e != nil { logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster: %v", ep.Name(), e) } diff --git a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go index 315195ebb8..423066c128 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go +++ b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go @@ -674,24 +674,41 @@ func (sb *sandbox) SetKey(basePath string) error { return nil } -func (sb *sandbox) EnableService() error { +func (sb *sandbox) EnableService() (err error) { logrus.Debugf("EnableService %s START", sb.containerID) + defer func() { + if err != nil { + sb.DisableService() + } + }() for _, ep := range sb.getConnectedEndpoints() { - if ep.enableService(true) { + if !ep.isServiceEnabled() { if err := ep.addServiceInfoToCluster(sb); err != nil { - ep.enableService(false) return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err) } + ep.enableService() } } logrus.Debugf("EnableService %s DONE", sb.containerID) return nil } -func (sb *sandbox) DisableService() error { +func (sb *sandbox) DisableService() (err error) { logrus.Debugf("DisableService %s START", sb.containerID) + failedEps := []string{} + defer func() { + if len(failedEps) > 0 { + err = fmt.Errorf("failed to disable service on sandbox:%s, for endpoints %s", sb.ID(), strings.Join(failedEps, ",")) + } + }() for _, ep := range sb.getConnectedEndpoints() { - ep.enableService(false) + if ep.isServiceEnabled() { + if err := ep.deleteServiceInfoFromCluster(sb, "DisableService"); err != nil { + failedEps = append(failedEps, ep.Name()) + logrus.Warnf("failed update state for endpoint %s into cluster: %v", ep.Name(), err) + } + ep.disableService() + } } logrus.Debugf("DisableService %s DONE", sb.containerID) return nil