diff --git a/components/engine/builder/builder.go b/components/engine/builder/builder.go index f7a4f91f27..e6edc85c82 100644 --- a/components/engine/builder/builder.go +++ b/components/engine/builder/builder.go @@ -114,7 +114,7 @@ type Backend interface { // PullOnBuild tells Docker to pull image referenced by `name`. PullOnBuild(ctx context.Context, name string, authConfigs map[string]types.AuthConfig, output io.Writer) (Image, error) // ContainerAttachRaw attaches to container. - ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error + ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool, attached chan struct{}) error // ContainerCreate creates a new Docker container and returns potential warnings ContainerCreate(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error) // ContainerRm removes a container specified by `id`. diff --git a/components/engine/builder/dockerfile/internals.go b/components/engine/builder/dockerfile/internals.go index e4e59e87f8..4ef7beb08c 100644 --- a/components/engine/builder/dockerfile/internals.go +++ b/components/engine/builder/dockerfile/internals.go @@ -573,11 +573,18 @@ func (b *Builder) create() (string, error) { var errCancelled = errors.New("build cancelled") func (b *Builder) run(cID string) (err error) { + attached := make(chan struct{}) errCh := make(chan error) go func() { - errCh <- b.docker.ContainerAttachRaw(cID, nil, b.Stdout, b.Stderr, true) + errCh <- b.docker.ContainerAttachRaw(cID, nil, b.Stdout, b.Stderr, true, attached) }() + select { + case err := <-errCh: + return err + case <-attached: + } + finished := make(chan struct{}) cancelErrCh := make(chan error, 1) go func() { diff --git a/components/engine/builder/dockerfile/mockbackend_test.go b/components/engine/builder/dockerfile/mockbackend_test.go index 4c03569678..d64125b082 100644 --- a/components/engine/builder/dockerfile/mockbackend_test.go +++ b/components/engine/builder/dockerfile/mockbackend_test.go @@ -33,7 +33,7 @@ func (m *MockBackend) PullOnBuild(ctx context.Context, name string, authConfigs return nil, nil } -func (m *MockBackend) ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool) error { +func (m *MockBackend) ContainerAttachRaw(cID string, stdin io.ReadCloser, stdout, stderr io.Writer, stream bool, attached chan struct{}) error { return nil } diff --git a/components/engine/daemon/attach.go b/components/engine/daemon/attach.go index fb213132f8..0a6c05dc2c 100644 --- a/components/engine/daemon/attach.go +++ b/components/engine/daemon/attach.go @@ -73,7 +73,7 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA } // ContainerAttachRaw attaches the provided streams to the container's stdio -func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool) error { +func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadCloser, stdout, stderr io.Writer, doStream bool, attached chan struct{}) error { container, err := daemon.GetContainer(prefixOrName) if err != nil { return err @@ -86,6 +86,7 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose CloseStdin: container.Config.StdinOnce, } container.StreamConfig.AttachStreams(&cfg) + close(attached) if cfg.UseStdin { cfg.Stdin = stdin } @@ -101,15 +102,23 @@ func (daemon *Daemon) ContainerAttachRaw(prefixOrName string, stdin io.ReadClose func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.AttachConfig, logs, doStream bool) error { if logs { - logDriver, err := daemon.getLogger(c) + logDriver, logCreated, err := daemon.getLogger(c) if err != nil { return err } + if logCreated { + defer func() { + if err = logDriver.Close(); err != nil { + logrus.Errorf("Error closing logger: %v", err) + } + }() + } cLog, ok := logDriver.(logger.LogReader) if !ok { return logger.ErrReadLogsNotSupported } logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) + defer logs.Close() LogLoop: for { diff --git a/components/engine/daemon/logger/journald/journald.go b/components/engine/daemon/logger/journald/journald.go index 04ae84b6d9..9865a273c5 100644 --- a/components/engine/daemon/logger/journald/journald.go +++ b/components/engine/daemon/logger/journald/journald.go @@ -18,12 +18,13 @@ import ( const name = "journald" type journald struct { + mu sync.Mutex vars map[string]string // additional variables and values to send to the journal along with the log message readers readerList + closed bool } type readerList struct { - mu sync.Mutex readers map[*logger.LogWatcher]*logger.LogWatcher } diff --git a/components/engine/daemon/logger/journald/read.go b/components/engine/daemon/logger/journald/read.go index 9b896e0dc7..9ecc3b521d 100644 --- a/components/engine/daemon/logger/journald/read.go +++ b/components/engine/daemon/logger/journald/read.go @@ -161,11 +161,12 @@ import ( ) func (s *journald) Close() error { - s.readers.mu.Lock() + s.mu.Lock() + s.closed = true for reader := range s.readers.readers { reader.Close() } - s.readers.mu.Unlock() + s.mu.Unlock() return nil } @@ -245,9 +246,16 @@ drain: } func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.ReadConfig, j *C.sd_journal, pfd [2]C.int, cursor *C.char) *C.char { - s.readers.mu.Lock() + s.mu.Lock() s.readers.readers[logWatcher] = logWatcher - s.readers.mu.Unlock() + if s.closed { + // the journald Logger is closed, presumably because the container has been + // reset. So we shouldn't follow, because we'll never be woken up. But we + // should make one more drainJournal call to be sure we've got all the logs. + // Close pfd[1] so that one drainJournal happens, then cleanup, then return. + C.close(pfd[1]) + } + s.mu.Unlock() newCursor := make(chan *C.char) @@ -274,22 +282,22 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, config logger.Re // Clean up. C.close(pfd[0]) - s.readers.mu.Lock() + s.mu.Lock() delete(s.readers.readers, logWatcher) - s.readers.mu.Unlock() + s.mu.Unlock() close(logWatcher.Msg) newCursor <- cursor }() // Wait until we're told to stop. select { + case cursor = <-newCursor: case <-logWatcher.WatchClose(): // Notify the other goroutine that its work is done. C.close(pfd[1]) + cursor = <-newCursor } - cursor = <-newCursor - return cursor } diff --git a/components/engine/daemon/logger/jsonfilelog/jsonfilelog.go b/components/engine/daemon/logger/jsonfilelog/jsonfilelog.go index 5ad701a0d7..797644e669 100644 --- a/components/engine/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/components/engine/daemon/logger/jsonfilelog/jsonfilelog.go @@ -27,6 +27,7 @@ type JSONFileLogger struct { mu sync.Mutex readers map[*logger.LogWatcher]struct{} // stores the active log followers extra []byte // json-encoded extra attributes + closed bool } func init() { @@ -142,6 +143,7 @@ func (l *JSONFileLogger) LogPath() string { // Close closes underlying file and signals all readers to stop. func (l *JSONFileLogger) Close() error { l.mu.Lock() + l.closed = true err := l.writer.Close() for r := range l.readers { r.Close() diff --git a/components/engine/daemon/logger/jsonfilelog/read.go b/components/engine/daemon/logger/jsonfilelog/read.go index 30d533fc1f..9f282eafbb 100644 --- a/components/engine/daemon/logger/jsonfilelog/read.go +++ b/components/engine/daemon/logger/jsonfilelog/read.go @@ -88,10 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R } } - if !config.Follow { - if err := latestFile.Close(); err != nil { - logrus.Errorf("Error closing file: %v", err) - } + if !config.Follow || l.closed { l.mu.Unlock() return } @@ -100,17 +97,18 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R latestFile.Seek(0, os.SEEK_END) } + notifyRotate := l.writer.NotifyRotate() + defer l.writer.NotifyRotateEvict(notifyRotate) + l.readers[logWatcher] = struct{}{} + l.mu.Unlock() - notifyRotate := l.writer.NotifyRotate() followLogs(latestFile, logWatcher, notifyRotate, config.Since) l.mu.Lock() delete(l.readers, logWatcher) l.mu.Unlock() - - l.writer.NotifyRotateEvict(notifyRotate) } func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { diff --git a/components/engine/daemon/logs.go b/components/engine/daemon/logs.go index b207fb693e..96e1b8a491 100644 --- a/components/engine/daemon/logs.go +++ b/components/engine/daemon/logs.go @@ -45,17 +45,24 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c return nil, logger.ErrReadLogsNotSupported } - cLog, err := daemon.getLogger(container) + cLog, cLogCreated, err := daemon.getLogger(container) if err != nil { return nil, err } + if cLogCreated { + defer func() { + if err = cLog.Close(); err != nil { + logrus.Errorf("Error closing logger: %v", err) + } + }() + } logReader, ok := cLog.(logger.LogReader) if !ok { return nil, logger.ErrReadLogsNotSupported } - follow := config.Follow && container.IsRunning() + follow := config.Follow && !cLogCreated tailLines, err := strconv.Atoi(config.Tail) if err != nil { tailLines = -1 @@ -85,23 +92,8 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c messageChan := make(chan *backend.LogMessage, 1) go func() { // set up some defers - defer func() { - // ok so this function, originally, was placed right after that - // logger.ReadLogs call above. I THINK that means it sets off the - // chain of events that results in the logger needing to be closed. - // i do not know if an error in time parsing above causing an early - // return will result in leaking the logger. if that is the case, - // it would also have been a bug in the original code - logs.Close() - if cLog != container.LogDriver { - // Since the logger isn't cached in the container, which - // occurs if it is running, it must get explicitly closed - // here to avoid leaking it and any file handles it has. - if err := cLog.Close(); err != nil { - logrus.Errorf("Error closing logger: %v", err) - } - } - }() + defer logs.Close() + // close the messages channel. closing is the only way to signal above // that we're doing with logs (other than context cancel i guess). defer close(messageChan) @@ -148,11 +140,17 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c return messageChan, nil } -func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger, error) { - if container.LogDriver != nil && container.IsRunning() { - return container.LogDriver, nil +func (daemon *Daemon) getLogger(container *container.Container) (l logger.Logger, created bool, err error) { + container.Lock() + if container.State.Running { + l = container.LogDriver } - return container.StartLogger() + container.Unlock() + if l == nil { + created = true + l, err = container.StartLogger() + } + return } // mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.