Improved push and pull with upload manager and download manager

This commit adds a transfer manager which deduplicates and schedules
transfers, and also an upload manager and download manager that build on
top of the transfer manager to provide high-level interfaces for uploads
and downloads. The push and pull code is modified to use these building
blocks.

Some benefits of the changes:

- Simplification of push/pull code
- Pushes can upload layers concurrently
- Failed downloads and uploads are retried after backoff delays
- Cancellation is supported, but individual transfers will only be
  cancelled if all pushes or pulls using them are cancelled.
- The distribution code is decoupled from Docker Engine packages and API
  conventions (i.e. streamformatter), which will make it easier to split
  out.

This commit also includes unit tests for the new distribution/xfer
package. The tests cover 87.8% of the statements in the package.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
Upstream-commit: 572ce802306a4e919802e5b77cbeca94acda7c0a
Component: engine
This commit is contained in:
Aaron Lehmann
2015-11-13 16:59:01 -08:00
parent e5a75c852e
commit 547342d777
36 changed files with 2675 additions and 1127 deletions

View File

@ -0,0 +1,63 @@
package progress
import (
"fmt"
)
// Progress represents the progress of a transfer.
type Progress struct {
ID string
// Progress contains a Message or...
Message string
// ...progress of an action
Action string
Current int64
Total int64
LastUpdate bool
}
// Output is an interface for writing progress information. It's
// like a writer for progress, but we don't call it Writer because
// that would be confusing next to ProgressReader (also, because it
// doesn't implement the io.Writer interface).
type Output interface {
WriteProgress(Progress) error
}
type chanOutput chan<- Progress
func (out chanOutput) WriteProgress(p Progress) error {
out <- p
return nil
}
// ChanOutput returns a Output that writes progress updates to the
// supplied channel.
func ChanOutput(progressChan chan<- Progress) Output {
return chanOutput(progressChan)
}
// Update is a convenience function to write a progress update to the channel.
func Update(out Output, id, action string) {
out.WriteProgress(Progress{ID: id, Action: action})
}
// Updatef is a convenience function to write a printf-formatted progress update
// to the channel.
func Updatef(out Output, id, format string, a ...interface{}) {
Update(out, id, fmt.Sprintf(format, a...))
}
// Message is a convenience function to write a progress message to the channel.
func Message(out Output, id, message string) {
out.WriteProgress(Progress{ID: id, Message: message})
}
// Messagef is a convenience function to write a printf-formatted progress
// message to the channel.
func Messagef(out Output, id, format string, a ...interface{}) {
Message(out, id, fmt.Sprintf(format, a...))
}

View File

@ -0,0 +1,59 @@
package progress
import (
"io"
)
// Reader is a Reader with progress bar.
type Reader struct {
in io.ReadCloser // Stream to read from
out Output // Where to send progress bar to
size int64
current int64
lastUpdate int64
id string
action string
}
// NewProgressReader creates a new ProgressReader.
func NewProgressReader(in io.ReadCloser, out Output, size int64, id, action string) *Reader {
return &Reader{
in: in,
out: out,
size: size,
id: id,
action: action,
}
}
func (p *Reader) Read(buf []byte) (n int, err error) {
read, err := p.in.Read(buf)
p.current += int64(read)
updateEvery := int64(1024 * 512) //512kB
if p.size > 0 {
// Update progress for every 1% read if 1% < 512kB
if increment := int64(0.01 * float64(p.size)); increment < updateEvery {
updateEvery = increment
}
}
if p.current-p.lastUpdate > updateEvery || err != nil {
p.updateProgress(err != nil && read == 0)
p.lastUpdate = p.current
}
return read, err
}
// Close closes the progress reader and its underlying reader.
func (p *Reader) Close() error {
if p.current < p.size {
// print a full progress bar when closing prematurely
p.current = p.size
p.updateProgress(false)
}
return p.in.Close()
}
func (p *Reader) updateProgress(last bool) {
p.out.WriteProgress(Progress{ID: p.id, Action: p.action, Current: p.current, Total: p.size, LastUpdate: last})
}

View File

@ -0,0 +1,75 @@
package progress
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
func TestOutputOnPrematureClose(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
part := make([]byte, 4, 4)
_, err := io.ReadFull(pr, part)
if err != nil {
pr.Close()
t.Fatal(err)
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
default:
t.Fatalf("Expected some output when closing prematurely")
}
}
func TestCompleteSilently(t *testing.T) {
content := []byte("TESTING")
reader := ioutil.NopCloser(bytes.NewReader(content))
progressChan := make(chan Progress, 10)
pr := NewProgressReader(reader, ChanOutput(progressChan), int64(len(content)), "Test", "Read")
out, err := ioutil.ReadAll(pr)
if err != nil {
pr.Close()
t.Fatal(err)
}
if string(out) != "TESTING" {
pr.Close()
t.Fatalf("Unexpected output %q from reader", string(out))
}
drainLoop:
for {
select {
case <-progressChan:
default:
break drainLoop
}
}
pr.Close()
select {
case <-progressChan:
t.Fatalf("Should have closed silently when read is complete")
default:
}
}