From 257c410f05bf1089525829872a3cf1846e7a8a2b Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Fri, 2 Mar 2018 18:52:15 +0000 Subject: [PATCH] Allow awslogs to use non-blocking mode When then non-blocking mode is specified, awslogs will: - No longer potentially block calls to logstream.Log(), instead will return an error if the awslogs buffer is full. This has the effect of dropping log messages sent to awslogs.Log() that are made while the buffer is full. - Wait to initialize the log stream until the first Log() call instead of in New(). This has the effect of allowing the container to start in the case where Cloudwatch Logs is unreachable. Both of these changes require the --log-opt mode=non-blocking to be explicitly set and do not modify the default behavior. Signed-off-by: Cody Roseborough Upstream-commit: c7e379988c9cd6ec0af528e6f59eea3c51b36738 Component: engine --- .../daemon/logger/awslogs/cloudwatchlogs.go | 67 ++++++++- .../logger/awslogs/cloudwatchlogs_test.go | 140 ++++++++++++++++-- 2 files changed, 185 insertions(+), 22 deletions(-) diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go index 835379b3b4..d17a9e7896 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go @@ -61,6 +61,7 @@ type logStream struct { logStreamName string logGroupName string logCreateGroup bool + logNonBlocking bool multilinePattern *regexp.Regexp client api messages chan *logger.Message @@ -127,6 +128,8 @@ func New(info logger.Info) (logger.Logger, error) { } } + logNonBlocking := info.Config["mode"] == "non-blocking" + if info.Config[logStreamKey] != "" { logStreamName = info.Config[logStreamKey] } @@ -140,19 +143,54 @@ func New(info logger.Info) (logger.Logger, error) { if err != nil { return nil, err } + containerStream := &logStream{ logStreamName: logStreamName, logGroupName: logGroupName, logCreateGroup: logCreateGroup, + logNonBlocking: logNonBlocking, multilinePattern: multilinePattern, client: client, messages: make(chan *logger.Message, 4096), } - err = containerStream.create() - if err != nil { - return nil, err + + creationDone := make(chan bool) + if logNonBlocking { + go func() { + backoff := 1 + maxBackoff := 32 + for { + // If logger is closed we are done + containerStream.lock.RLock() + if containerStream.closed { + containerStream.lock.RUnlock() + break + } + containerStream.lock.RUnlock() + err := containerStream.create() + if err == nil { + break + } + + time.Sleep(time.Duration(backoff) * time.Second) + if backoff < maxBackoff { + backoff *= 2 + } + logrus. + WithError(err). + WithField("container-id", info.ContainerID). + WithField("container-name", info.ContainerName). + Error("Error while trying to initialize awslogs. Retrying in: ", backoff, " seconds") + } + close(creationDone) + }() + } else { + if err = containerStream.create(); err != nil { + return nil, err + } + close(creationDone) } - go containerStream.collectBatch() + go containerStream.collectBatch(creationDone) return containerStream, nil } @@ -294,9 +332,18 @@ func (l *logStream) BufSize() int { func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() - if !l.closed { - l.messages <- msg + if l.closed { + return errors.New("awslogs is closed") } + if l.logNonBlocking { + select { + case l.messages <- msg: + return nil + default: + return errors.New("awslogs buffer is full") + } + } + l.messages <- msg return nil } @@ -322,7 +369,9 @@ func (l *logStream) create() error { return l.createLogStream() } } - return err + if err != nil { + return err + } } return nil @@ -399,7 +448,9 @@ var newTicker = func(freq time.Duration) *time.Ticker { // seconds. When events are ready to be processed for submission to CloudWatch // Logs, the processEvents method is called. If a multiline pattern is not // configured, log events are submitted to the processEvents method immediately. -func (l *logStream) collectBatch() { +func (l *logStream) collectBatch(created chan bool) { + // Wait for the logstream/group to be created + <-created ticker := newTicker(batchPublishFrequency) var eventBuffer []byte var eventBufferTimestamp int64 diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go index 080157b2ea..a988d0e75f 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -200,6 +200,93 @@ func TestCreateAlreadyExists(t *testing.T) { } } +func TestLogClosed(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + closed: true, + } + err := stream.Log(&logger.Message{}) + if err == nil { + t.Fatal("Expected non-nil error") + } +} + +func TestLogBlocking(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message), + } + + errorCh := make(chan error, 1) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + t.Fatal("Expected stream.Log to block: ", err) + default: + break + } + select { + case <-stream.messages: + break + default: + t.Fatal("Expected to be able to read from stream.messages but was unable to") + } + select { + case err := <-errorCh: + if err != nil { + t.Fatal(err) + } + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for read") + } +} + +func TestLogNonBlockingBufferEmpty(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + err := stream.Log(&logger.Message{}) + if err != nil { + t.Fatal(err) + } +} + +func TestLogNonBlockingBufferFull(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + messages: make(chan *logger.Message, 1), + logNonBlocking: true, + } + stream.messages <- &logger.Message{} + errorCh := make(chan error) + started := make(chan bool) + go func() { + started <- true + err := stream.Log(&logger.Message{}) + errorCh <- err + }() + <-started + select { + case err := <-errorCh: + if err == nil { + t.Fatal("Expected non-nil error") + } + case <-time.After(30 * time.Second): + t.Fatal("Expected Log call to not block") + } +} func TestPublishBatchSuccess(t *testing.T) { mockClient := newMockClient() stream := &logStream{ @@ -409,8 +496,9 @@ func TestCollectBatchSimple(t *testing.T) { C: ticks, } } - - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -453,7 +541,9 @@ func TestCollectBatchTicker(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline + " 1"), @@ -525,7 +615,9 @@ func TestCollectBatchMultilinePattern(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -579,7 +671,9 @@ func BenchmarkCollectBatch(b *testing.B) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -609,7 +703,9 @@ func BenchmarkCollectBatchMultilinePattern(b *testing.B) { C: ticks, } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.logGenerator(10, 100) ticks <- time.Time{} stream.Close() @@ -639,7 +735,9 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -701,7 +799,9 @@ func TestCollectBatchMultilinePatternNegativeEventAge(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -749,7 +849,9 @@ func TestCollectBatchMultilinePatternMaxEventSize(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) // Log max event size longline := strings.Repeat("A", maximumBytesPerEvent) @@ -800,7 +902,9 @@ func TestCollectBatchClose(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) stream.Log(&logger.Message{ Line: []byte(logline), @@ -843,7 +947,9 @@ func TestCollectBatchLineSplit(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) longline := strings.Repeat("A", maximumBytesPerEvent) stream.Log(&logger.Message{ @@ -890,7 +996,9 @@ func TestCollectBatchMaxEvents(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) line := "A" for i := 0; i <= maximumLogEventsPerPut; i++ { @@ -945,7 +1053,9 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) numPayloads := maximumBytesPerPut / (maximumBytesPerEvent + perEventBytes) // maxline is the maximum line that could be submitted after @@ -1024,7 +1134,9 @@ func TestCollectBatchWithDuplicateTimestamps(t *testing.T) { } } - go stream.collectBatch() + d := make(chan bool) + close(d) + go stream.collectBatch(d) times := maximumLogEventsPerPut expectedEvents := []*cloudwatchlogs.InputLogEvent{}