diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go index 3a7f2f631d..de168cfd73 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs.go @@ -245,6 +245,10 @@ func (l *logStream) Name() string { return name } +func (l *logStream) BufSize() int { + return maximumBytesPerEvent +} + // Log submits messages for logging by an instance of the awslogs logging driver func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() diff --git a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go index 989eb6f52c..d9bffc576f 100644 --- a/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -1049,6 +1049,11 @@ func TestCreateTagSuccess(t *testing.T) { } } +func TestIsSizedLogger(t *testing.T) { + awslogs := &logStream{} + assert.Implements(t, (*logger.SizedLogger)(nil), awslogs, "awslogs should implement SizedLogger") +} + func BenchmarkUnwrapEvents(b *testing.B) { events := make([]wrappedEvent, maximumLogEventsPerPut) for i := 0; i < maximumLogEventsPerPut; i++ { diff --git a/components/engine/daemon/logger/copier.go b/components/engine/daemon/logger/copier.go index c773fc6a29..a1d4f06f8b 100644 --- a/components/engine/daemon/logger/copier.go +++ b/components/engine/daemon/logger/copier.go @@ -10,8 +10,13 @@ import ( ) const ( - bufSize = 16 * 1024 + // readSize is the maximum bytes read during a single read + // operation. readSize = 2 * 1024 + + // defaultBufSize provides a reasonable default for loggers that do + // not have an external limit to impose on log line size. + defaultBufSize = 16 * 1024 ) // Copier can copy logs from specified sources to Logger and attach Timestamp. @@ -44,7 +49,13 @@ func (c *Copier) Run() { func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() + + bufSize := defaultBufSize + if sizedLogger, ok := c.dst.(SizedLogger); ok { + bufSize = sizedLogger.BufSize() + } buf := make([]byte, bufSize) + n := 0 eof := false diff --git a/components/engine/daemon/logger/copier_test.go b/components/engine/daemon/logger/copier_test.go index 4210022dcd..a911a703e9 100644 --- a/components/engine/daemon/logger/copier_test.go +++ b/components/engine/daemon/logger/copier_test.go @@ -31,6 +31,25 @@ func (l *TestLoggerJSON) Close() error { return nil } func (l *TestLoggerJSON) Name() string { return "json" } +type TestSizedLoggerJSON struct { + *json.Encoder + mu sync.Mutex +} + +func (l *TestSizedLoggerJSON) Log(m *Message) error { + l.mu.Lock() + defer l.mu.Unlock() + return l.Encode(m) +} + +func (*TestSizedLoggerJSON) Close() error { return nil } + +func (*TestSizedLoggerJSON) Name() string { return "sized-json" } + +func (*TestSizedLoggerJSON) BufSize() int { + return 32 * 1024 +} + func TestCopier(t *testing.T) { stdoutLine := "Line that thinks that it is log line from docker stdout" stderrLine := "Line that thinks that it is log line from docker stderr" @@ -104,10 +123,9 @@ func TestCopier(t *testing.T) { // TestCopierLongLines tests long lines without line breaks func TestCopierLongLines(t *testing.T) { - // Long lines (should be split at "bufSize") - const bufSize = 16 * 1024 - stdoutLongLine := strings.Repeat("a", bufSize) - stderrLongLine := strings.Repeat("b", bufSize) + // Long lines (should be split at "defaultBufSize") + stdoutLongLine := strings.Repeat("a", defaultBufSize) + stderrLongLine := strings.Repeat("b", defaultBufSize) stdoutTrailingLine := "stdout trailing line" stderrTrailingLine := "stderr trailing line" @@ -205,6 +223,41 @@ func TestCopierSlow(t *testing.T) { } } +func TestCopierWithSized(t *testing.T) { + var jsonBuf bytes.Buffer + expectedMsgs := 2 + sizedLogger := &TestSizedLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)} + logbuf := bytes.NewBufferString(strings.Repeat(".", sizedLogger.BufSize()*expectedMsgs)) + c := NewCopier(map[string]io.Reader{"stdout": logbuf}, sizedLogger) + + c.Run() + // Wait for Copier to finish writing to the buffered logger. + c.Wait() + c.Close() + + recvdMsgs := 0 + dec := json.NewDecoder(&jsonBuf) + for { + var msg Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + break + } + t.Fatal(err) + } + if msg.Source != "stdout" { + t.Fatalf("Wrong Source: %q, should be %q", msg.Source, "stdout") + } + if len(msg.Line) != sizedLogger.BufSize() { + t.Fatalf("Line was not of expected max length %d, was %d", sizedLogger.BufSize(), len(msg.Line)) + } + recvdMsgs++ + } + if recvdMsgs != expectedMsgs { + t.Fatalf("expected to receive %d messages, actually received %d", expectedMsgs, recvdMsgs) + } +} + type BenchmarkLoggerDummy struct { } diff --git a/components/engine/daemon/logger/logger.go b/components/engine/daemon/logger/logger.go index a9d1e7640b..9aa45aabdf 100644 --- a/components/engine/daemon/logger/logger.go +++ b/components/engine/daemon/logger/logger.go @@ -81,6 +81,13 @@ type Logger interface { Close() error } +// SizedLogger is the interface for logging drivers that can control +// the size of buffer used for their messages. +type SizedLogger interface { + Logger + BufSize() int +} + // ReadConfig is the configuration passed into ReadLogs. type ReadConfig struct { Since time.Time