From 257c410f05bf1089525829872a3cf1846e7a8a2b Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Fri, 2 Mar 2018 18:52:15 +0000 Subject: [PATCH 1/3] Allow awslogs to use non-blocking mode When then non-blocking mode is specified, awslogs will: - No longer potentially block calls to logstream.Log(), instead will return an error if the awslogs buffer is full. This has the effect of dropping log messages sent to awslogs.Log() that are made while the buffer is full. - Wait to initialize the log stream until the first Log() call instead of in New(). This has the effect of allowing the container to start in the case where Cloudwatch Logs is unreachable. Both of these changes require the --log-opt mode=non-blocking to be explicitly set and do not modify the default behavior. Signed-off-by: Cody Roseborough Upstream-commit: c7e379988c9cd6ec0af528e6f59eea3c51b36738 Component: engine --- .../daemon/logger/awslogs/cloudwatchlogs.go | 67 ++++++++- .../logger/awslogs/cloudwatchlogs_test.go | 140 ++++++++++++++++-- 2 files changed, 185 insertions(+), 22 deletions(-) diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go index 835379b3b4..d17a9e7896 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go @@ -61,6 +61,7 @@ type logStream struct { logStreamName string logGroupName string logCreateGroup bool + logNonBlocking bool multilinePattern *regexp.Regexp client api messages chan *logger.Message @@ -127,6 +128,8 @@ func New(info logger.Info) (logger.Logger, error) { } } + logNonBlocking := info.Config["mode"] == "non-blocking" + if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } @@ -140,19 +143,54 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + containerStream := &logStream{ logStreamName: logStreamName, logGroupName: logGroupName, logCreateGroup: logCreateGroup, + logNonBlocking: logNonBlocking, multilinePattern: multilinePattern, client: client, messages: make(chan *logger.Message, 4096), } - err = containerStream.create() - if err != nil { - return nil, err + + creationDone := make(chan bool) + if logNonBlocking { + go func() { + backoff := 1 + maxBackoff := 32 + for { + // If logger is closed we are done + containerStream.lock.RLock() + if containerStream.closed { + containerStream.lock.RUnlock() + break + } + containerStream.lock.RUnlock() + err := containerStream.create() + if err == nil { + break + } + + time.Sleep(time.Duration(backoff) * time.Second) + if backoff < maxBackoff { + backoff *= 2 + } + logrus. + WithError(err). + WithField("container-id", info.ContainerID). + WithField("container-name", info.ContainerName). + Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") + } + close(creationDone) + }() + } else { + if err = containerStream.create(); err != nil { + return nil, err + } + close(creationDone) } - go containerStream.collectBatch() + go containerStream.collectBatch(creationDone) return containerStream, nil } @@ -294,9 +332,18 @@ func (l *logStream) BufSize() int { func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() - if !l.closed { - l.messages <- msg + if l.closed { + return errors.New("awslogs is closed") } + if l.logNonBlocking { + select { + case l.messages <- msg: + return nil + default: + return errors.New("awslogs buffer is full") + } + } + l.messages <- msg return nil } @@ -322,7 +369,9 @@ func (l *logStream) create() error { return l.createLogStream() } } - return err + if err != nil { + return err + } } return nil @@ -399,7 +448,9 @@ var newTicker = func(freq time.Duration) *time.Ticker { // seconds. When events are ready to be processed for submission to CloudWatch // Logs, the processEvents method is called. If a multiline pattern is not // configured, log events are submitted to the processEvents method immediately. -func (l *logStream) collectBatch() { +func (l *logStream) collectBatch(created chan bool) { + // Wait for the logstream/group to be created + <-created ticker := newTicker(batchPublishFrequency) var eventBuffer []byte var eventBufferTimestamp int64 diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go index 080157b2ea..a988d0e75f 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -200,6 +200,93 @@ func TestCreateAlreadyExists(t *testing.T) { } } +func TestLogClosed(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + closed: true, + } + err := stream.Log(&logger.Message{}) + if err == nil { + t.Fatal("Expected non-nil error") + } +} + +func TestLogBlocking(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message), + } + + errorCh := make(chan error, 1) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + t.Fatal("Expected stream.Log to block: ", err) + default: + break + } + select { + case <-stream.messages: + break + default: + t.Fatal("Expected to be able to read from stream.messages but was unable to") + } + select { + case err := <-errorCh: + if err != nil { + t.Fatal(err) + } + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for read") + } +} + +func TestLogNonBlockingBufferEmpty(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + err := stream.Log(&logger.Message{}) + if err != nil { + t.Fatal(err) + } +} + +func TestLogNonBlockingBufferFull(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + stream.messages <- &logger.Message{} + errorCh := make(chan error) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + if err == nil { + t.Fatal("Expected non-nil error") + } + case <-time.After(30 * time.Second): + t.Fatal("Expected Log call to not block") + } +} func TestPublishBatchSuccess(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -409,8 +496,9 @@ func TestCollectBatchSimple(t *testing.T) { C: ticks, } } - - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -453,7 +541,9 @@ func TestCollectBatchTicker(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline + " 1"), @@ -525,7 +615,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -579,7 +671,9 @@ func BenchmarkCollectBatch(b *testing.B) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -609,7 +703,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) { C: ticks, } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -639,7 +735,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -701,7 +799,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -749,7 +849,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) // Log max event size longline := strings.Repeat("A", maximumBytesPerEvent) @@ -800,7 +902,9 @@ func TestCollectBatchClose(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -843,7 +947,9 @@ func TestCollectBatchLineSplit(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) longline := strings.Repeat("A", maximumBytesPerEvent) stream.Log(&logger.Message{ @@ -890,7 +996,9 @@ func TestCollectBatchMaxEvents(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) line := "A" for i := 0; i <= maximumLogEventsPerPut; i++ { @@ -945,7 +1053,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) // maxline is the maximum line that could be submitted after @@ -1024,7 +1134,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) times := maximumLogEventsPerPut expectedEvents := []*cloudwatchlogs.InputLogEvent{} From 4c5e76b26e2a1305f96cf27b0d7fabd6e0315058 Mon Sep 17 00:00:00 2001 From: Dennis Chen Date: Fri, 27 Apr 2018 18:40:59 +0800 Subject: [PATCH 2/3] Fix the target name issue for multi-stage build This PR is trying to fix issue #36956. The stage name is case-insensitive by design, so we should use `strings.EqualFold()` as the comparison method to eliminate the case sensitive noise. Also we need to return a pre-defined error code order to avoid below message like: "FIXME: Got an API for which error does not match any expected type!!!: failed to reach build target dev in Dockerfile" Signed-off-by: Dennis Chen Upstream-commit: 7c0570473cfa181aeb3278072cc9af4f9298cb98 Component: engine --- components/engine/builder/dockerfile/builder.go | 2 +- components/engine/builder/dockerfile/instructions/commands.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/components/engine/builder/dockerfile/builder.go b/components/engine/builder/dockerfile/builder.go index cdc75c3c32..cd2fa7c56b 100644 --- a/components/engine/builder/dockerfile/builder.go +++ b/components/engine/builder/dockerfile/builder.go @@ -226,7 +226,7 @@ func (b *Builder) build(source builder.Source, dockerfile *parser.Result) (*buil targetIx, found := instructions.HasStage(stages, b.options.Target) if !found { buildsFailed.WithValues(metricsBuildTargetNotReachableError).Inc() - return nil, errors.Errorf("failed to reach build target %s in Dockerfile", b.options.Target) + return nil, errdefs.InvalidParameter(errors.Errorf("failed to reach build target %s in Dockerfile", b.options.Target)) } stages = stages[:targetIx+1] } diff --git a/components/engine/builder/dockerfile/instructions/commands.go b/components/engine/builder/dockerfile/instructions/commands.go index a10140cf04..9d864e5325 100644 --- a/components/engine/builder/dockerfile/instructions/commands.go +++ b/components/engine/builder/dockerfile/instructions/commands.go @@ -390,7 +390,8 @@ func CurrentStage(s []Stage) (*Stage, error) { // HasStage looks for the presence of a given stage name func HasStage(s []Stage, name string) (int, bool) { for i, stage := range s { - if stage.Name == name { + // Stage name is case-insensitive by design + if strings.EqualFold(stage.Name, name) { return i, true } } From 0e7b49da2ede169bc4a72c6a5a0289984034f445 Mon Sep 17 00:00:00 2001 From: Dennis Chen Date: Sat, 28 Apr 2018 15:45:23 +0800 Subject: [PATCH 3/3] Add `--target` name case sensitive test code for multi-stage build Add testing code to cover the `--target` name case sensitive issue reported by issue #36956. Signed-off-by: Dennis Chen Upstream-commit: a95fabc70efacc1ff6c0c62c490cf551215bc503 Component: engine --- .../engine/integration-cli/docker_cli_build_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/components/engine/integration-cli/docker_cli_build_test.go b/components/engine/integration-cli/docker_cli_build_test.go index cd57d591f2..2ed5e44d8f 100644 --- a/components/engine/integration-cli/docker_cli_build_test.go +++ b/components/engine/integration-cli/docker_cli_build_test.go @@ -5965,10 +5965,16 @@ func (s *DockerSuite) TestBuildIntermediateTarget(c *check.C) { cli.BuildCmd(c, "build1", build.WithExternalBuildContext(ctx), cli.WithFlags("--target", "build-env")) - //res := inspectFieldJSON(c, "build1", "Config.Cmd") res := cli.InspectCmd(c, "build1", cli.Format("json .Config.Cmd")).Combined() c.Assert(strings.TrimSpace(res), checker.Equals, `["/dev"]`) + // Stage name is case-insensitive by design + cli.BuildCmd(c, "build1", build.WithExternalBuildContext(ctx), + cli.WithFlags("--target", "BUIld-EnV")) + + res = cli.InspectCmd(c, "build1", cli.Format("json .Config.Cmd")).Combined() + c.Assert(strings.TrimSpace(res), checker.Equals, `["/dev"]`) + result := cli.Docker(cli.Build("build1"), build.WithExternalBuildContext(ctx), cli.WithFlags("--target", "nosuchtarget")) result.Assert(c, icmd.Expected{