Merge component 'engine' from git@github.com:moby/moby master

This commit is contained in:
GordonTheTurtle
2018-05-02 17:07:13 +00:00
5 changed files with 195 additions and 25 deletions

View File

@ -226,7 +226,7 @@ func (b *Builder) build(source builder.Source, dockerfile *parser.Result) (*buil
targetIx, found := instructions.HasStage(stages, b.options.Target)
if !found {
buildsFailed.WithValues(metricsBuildTargetNotReachableError).Inc()
return nil, errors.Errorf("failed to reach build target %s in Dockerfile", b.options.Target)
return nil, errdefs.InvalidParameter(errors.Errorf("failed to reach build target %s in Dockerfile", b.options.Target))
}
stages = stages[:targetIx+1]
}

View File

@ -390,7 +390,8 @@ func CurrentStage(s []Stage) (*Stage, error) {
// HasStage looks for the presence of a given stage name
func HasStage(s []Stage, name string) (int, bool) {
for i, stage := range s {
if stage.Name == name {
// Stage name is case-insensitive by design
if strings.EqualFold(stage.Name, name) {
return i, true
}
}

View File

@ -61,6 +61,7 @@ type logStream struct {
logStreamName string
logGroupName string
logCreateGroup bool
logNonBlocking bool
multilinePattern *regexp.Regexp
client api
messages chan *logger.Message
@ -129,6 +130,8 @@ func New(info logger.Info) (logger.Logger, error) {
}
}
logNonBlocking := info.Config["mode"] == "non-blocking"
if info.Config[logStreamKey] != "" {
logStreamName = info.Config[logStreamKey]
}
@ -142,19 +145,54 @@ func New(info logger.Info) (logger.Logger, error) {
if err != nil {
return nil, err
}
containerStream := &logStream{
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
logNonBlocking: logNonBlocking,
multilinePattern: multilinePattern,
client: client,
messages: make(chan *logger.Message, 4096),
}
err = containerStream.create()
if err != nil {
return nil, err
creationDone := make(chan bool)
if logNonBlocking {
go func() {
backoff := 1
maxBackoff := 32
for {
// If logger is closed we are done
containerStream.lock.RLock()
if containerStream.closed {
containerStream.lock.RUnlock()
break
}
containerStream.lock.RUnlock()
err := containerStream.create()
if err == nil {
break
}
time.Sleep(time.Duration(backoff) * time.Second)
if backoff < maxBackoff {
backoff *= 2
}
logrus.
WithError(err).
WithField("container-id", info.ContainerID).
WithField("container-name", info.ContainerName).
Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds")
}
close(creationDone)
}()
} else {
if err = containerStream.create(); err != nil {
return nil, err
}
close(creationDone)
}
go containerStream.collectBatch()
go containerStream.collectBatch(creationDone)
return containerStream, nil
}
@ -296,9 +334,18 @@ func (l *logStream) BufSize() int {
func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
l.messages <- msg
if l.closed {
return errors.New("awslogs is closed")
}
if l.logNonBlocking {
select {
case l.messages <- msg:
return nil
default:
return errors.New("awslogs buffer is full")
}
}
l.messages <- msg
return nil
}
@ -324,7 +371,9 @@ func (l *logStream) create() error {
return l.createLogStream()
}
}
return err
if err != nil {
return err
}
}
return nil
@ -401,7 +450,9 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// seconds. When events are ready to be processed for submission to CloudWatch
// Logs, the processEvents method is called. If a multiline pattern is not
// configured, log events are submitted to the processEvents method immediately.
func (l *logStream) collectBatch() {
func (l *logStream) collectBatch(created chan bool) {
// Wait for the logstream/group to be created
<-created
ticker := newTicker(batchPublishFrequency)
var eventBuffer []byte
var eventBufferTimestamp int64

View File

@ -201,6 +201,93 @@ func TestCreateAlreadyExists(t *testing.T) {
}
}
func TestLogClosed(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
closed: true,
}
err := stream.Log(&logger.Message{})
if err == nil {
t.Fatal("Expected non-nil error")
}
}
func TestLogBlocking(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message),
}
errorCh := make(chan error, 1)
started := make(chan bool)
go func() {
started <- true
err := stream.Log(&logger.Message{})
errorCh <- err
}()
<-started
select {
case err := <-errorCh:
t.Fatal("Expected stream.Log to block: ", err)
default:
break
}
select {
case <-stream.messages:
break
default:
t.Fatal("Expected to be able to read from stream.messages but was unable to")
}
select {
case err := <-errorCh:
if err != nil {
t.Fatal(err)
}
case <-time.After(30 * time.Second):
t.Fatal("timed out waiting for read")
}
}
func TestLogNonBlockingBufferEmpty(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
}
err := stream.Log(&logger.Message{})
if err != nil {
t.Fatal(err)
}
}
func TestLogNonBlockingBufferFull(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
messages: make(chan *logger.Message, 1),
logNonBlocking: true,
}
stream.messages <- &logger.Message{}
errorCh := make(chan error)
started := make(chan bool)
go func() {
started <- true
err := stream.Log(&logger.Message{})
errorCh <- err
}()
<-started
select {
case err := <-errorCh:
if err == nil {
t.Fatal("Expected non-nil error")
}
case <-time.After(30 * time.Second):
t.Fatal("Expected Log call to not block")
}
}
func TestPublishBatchSuccess(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
@ -410,8 +497,9 @@ func TestCollectBatchSimple(t *testing.T) {
C: ticks,
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline),
@ -454,7 +542,9 @@ func TestCollectBatchTicker(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline + " 1"),
@ -526,7 +616,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline),
@ -580,7 +672,9 @@ func BenchmarkCollectBatch(b *testing.B) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.logGenerator(10, 100)
ticks <- time.Time{}
stream.Close()
@ -610,7 +704,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) {
C: ticks,
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.logGenerator(10, 100)
ticks <- time.Time{}
stream.Close()
@ -640,7 +736,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline),
@ -702,7 +800,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline),
@ -750,7 +850,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
// Log max event size
longline := strings.Repeat("A", maximumBytesPerEvent)
@ -801,7 +903,9 @@ func TestCollectBatchClose(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
stream.Log(&logger.Message{
Line: []byte(logline),
@ -844,7 +948,9 @@ func TestCollectBatchLineSplit(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
longline := strings.Repeat("A", maximumBytesPerEvent)
stream.Log(&logger.Message{
@ -891,7 +997,9 @@ func TestCollectBatchMaxEvents(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
line := "A"
for i := 0; i <= maximumLogEventsPerPut; i++ {
@ -946,7 +1054,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes)
// maxline is the maximum line that could be submitted after
@ -1025,7 +1135,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
}
}
go stream.collectBatch()
d := make(chan bool)
close(d)
go stream.collectBatch(d)
times := maximumLogEventsPerPut
expectedEvents := []*cloudwatchlogs.InputLogEvent{}

View File

@ -5965,10 +5965,16 @@ func (s *DockerSuite) TestBuildIntermediateTarget(c *check.C) {
cli.BuildCmd(c, "build1", build.WithExternalBuildContext(ctx),
cli.WithFlags("--target", "build-env"))
//res := inspectFieldJSON(c, "build1", "Config.Cmd")
res := cli.InspectCmd(c, "build1", cli.Format("json .Config.Cmd")).Combined()
c.Assert(strings.TrimSpace(res), checker.Equals, `["/dev"]`)
// Stage name is case-insensitive by design
cli.BuildCmd(c, "build1", build.WithExternalBuildContext(ctx),
cli.WithFlags("--target", "BUIld-EnV"))
res = cli.InspectCmd(c, "build1", cli.Format("json .Config.Cmd")).Combined()
c.Assert(strings.TrimSpace(res), checker.Equals, `["/dev"]`)
result := cli.Docker(cli.Build("build1"), build.WithExternalBuildContext(ctx),
cli.WithFlags("--target", "nosuchtarget"))
result.Assert(c, icmd.Expected{