From bd6132334207f0d85d8a801995f13aea19ae95fa Mon Sep 17 00:00:00 2001 From: Jacob Vallejo Date: Tue, 22 Aug 2017 10:52:52 -0700 Subject: [PATCH] logger: copy to log driver's bufsize Log drivers may have an internal buffer size that can be accommodated by the copier as it is more effective to buffer and send fewer though larger messages that the log driver can consume. This eliminates the need for Partial handling for drivers that do not support the concept (ie: awslogs, which can only have events up to service limits). Signed-off-by: Jacob Vallejo Upstream-commit: e1ada0b885b31de0bb0e79b4d99ae4d48b65f721 Component: engine --- .../daemon/logger/awslogs/cloudwatchlogs.go | 4 ++ .../logger/awslogs/cloudwatchlogs_test.go | 5 ++ components/engine/daemon/logger/copier.go | 13 +++- .../engine/daemon/logger/copier_test.go | 61 +++++++++++++++++-- components/engine/daemon/logger/logger.go | 7 +++ 5 files changed, 85 insertions(+), 5 deletions(-) diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go index 3a7f2f631d..de168cfd73 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go @@ -245,6 +245,10 @@ func (l *logStream) Name() string { return name } +func (l *logStream) BufSize() int { + return maximumBytesPerEvent +} + // Log submits messages for logging by an instance of the awslogs logging driver func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go index 989eb6f52c..d9bffc576f 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -1049,6 +1049,11 @@ func TestCreateTagSuccess(t *testing.T) { } } +func TestIsSizedLogger(t *testing.T) { + awslogs := &logStream{} + assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger") +} + func BenchmarkUnwrapEvents(b *testing.B) { events := make([]wrappedEvent, maximumLogEventsPerPut) for i := 0; i < maximumLogEventsPerPut; i++ { diff --git a/components/engine/daemon/logger/copier.go b/components/engine/daemon/logger/copier.go index c773fc6a29..a1d4f06f8b 100644 --- a/components/engine/daemon/logger/copier.go +++ b/components/engine/daemon/logger/copier.go @@ -10,8 +10,13 @@ import ( ) const ( - bufSize = 16 * 1024 + // readSize is the maximum bytes read during a single read + // operation. readSize = 2 * 1024 + + // defaultBufSize provides a reasonable default for loggers that do + // not have an external limit to impose on log line size. + defaultBufSize = 16 * 1024 ) // Copier can copy logs from specified sources to Logger and attach Timestamp. @@ -44,7 +49,13 @@ func (c *Copier) Run() { func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() + + bufSize := defaultBufSize + if sizedLogger, ok := c.dst.(SizedLogger); ok { + bufSize = sizedLogger.BufSize() + } buf := make([]byte, bufSize) + n := 0 eof := false diff --git a/components/engine/daemon/logger/copier_test.go b/components/engine/daemon/logger/copier_test.go index 4210022dcd..a911a703e9 100644 --- a/components/engine/daemon/logger/copier_test.go +++ b/components/engine/daemon/logger/copier_test.go @@ -31,6 +31,25 @@ func (l *TestLoggerJSON) Close() error { return nil } func (l *TestLoggerJSON) Name() string { return "json" } +type TestSizedLoggerJSON struct { + *json.Encoder + mu sync.Mutex +} + +func (l *TestSizedLoggerJSON) Log(m *Message) error { + l.mu.Lock() + defer l.mu.Unlock() + return l.Encode(m) +} + +func (*TestSizedLoggerJSON) Close() error { return nil } + +func (*TestSizedLoggerJSON) Name() string { return "sized-json" } + +func (*TestSizedLoggerJSON) BufSize() int { + return 32 * 1024 +} + func TestCopier(t *testing.T) { stdoutLine := "Line that thinks that it is log line from docker stdout" stderrLine := "Line that thinks that it is log line from docker stderr" @@ -104,10 +123,9 @@ func TestCopier(t *testing.T) { // TestCopierLongLines tests long lines without line breaks func TestCopierLongLines(t *testing.T) { - // Long lines (should be split at "bufSize") - const bufSize = 16 * 1024 - stdoutLongLine := strings.Repeat("a", bufSize) - stderrLongLine := strings.Repeat("b", bufSize) + // Long lines (should be split at "defaultBufSize") + stdoutLongLine := strings.Repeat("a", defaultBufSize) + stderrLongLine := strings.Repeat("b", defaultBufSize) stdoutTrailingLine := "stdout trailing line" stderrTrailingLine := "stderr trailing line" @@ -205,6 +223,41 @@ func TestCopierSlow(t *testing.T) { } } +func TestCopierWithSized(t *testing.T) { + var jsonBuf bytes.Buffer + expectedMsgs := 2 + sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} + logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs)) + c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) + + c.Run() + // Wait for Copier to finish writing to the buffered logger. + c.Wait() + c.Close() + + recvdMsgs := 0 + dec := json.NewDecoder(&jsonBuf) + for { + var msg Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + if msg.Source != "stdout" { + t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout") + } + if len(msg.Line) != sizedLogger.BufSize() { + t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line)) + } + recvdMsgs++ + } + if recvdMsgs != expectedMsgs { + t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) + } +} + type BenchmarkLoggerDummy struct { } diff --git a/components/engine/daemon/logger/logger.go b/components/engine/daemon/logger/logger.go index a9d1e7640b..9aa45aabdf 100644 --- a/components/engine/daemon/logger/logger.go +++ b/components/engine/daemon/logger/logger.go @@ -81,6 +81,13 @@ type Logger interface { Close() error } +// SizedLogger is the interface for logging drivers that can control +// the size of buffer used for their messages. +type SizedLogger interface { + Logger + BufSize() int +} + // ReadConfig is the configuration passed into ReadLogs. type ReadConfig struct { Since time.Time