Merge pull request #48 from kolyshkin/18.09-backport-logs-follow
[18.09] backport "daemon.ContainerLogs(): fix resource leak on follow" Upstream-commit: 6531bac59bfd453456231511bdc3efade1fc9481 Component: engine
This commit is contained in:
@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
|
||||
return logger.ErrReadLogsNotSupported{}
|
||||
}
|
||||
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
|
||||
defer logs.Close()
|
||||
defer logs.ConsumerGone()
|
||||
|
||||
LogLoop:
|
||||
for {
|
||||
|
||||
@ -93,21 +93,12 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
|
||||
|
||||
dec := logdriver.NewLogEntryDecoder(stream)
|
||||
for {
|
||||
select {
|
||||
case <-watcher.WatchClose():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
var buf logdriver.LogEntry
|
||||
if err := dec.Decode(&buf); err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
|
||||
case <-watcher.WatchClose():
|
||||
}
|
||||
watcher.Err <- errors.Wrap(err, "error decoding log message")
|
||||
return
|
||||
}
|
||||
|
||||
@ -125,11 +116,10 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
|
||||
return
|
||||
}
|
||||
|
||||
// send the message unless the consumer is gone
|
||||
select {
|
||||
case watcher.Msg <- msg:
|
||||
case <-watcher.WatchClose():
|
||||
// make sure the message we consumed is sent
|
||||
watcher.Msg <- msg
|
||||
case <-watcher.WatchConsumerGone():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) {
|
||||
t.Fatal("timeout waiting for message channel to close")
|
||||
|
||||
}
|
||||
lw.Close()
|
||||
lw.ProducerGone()
|
||||
|
||||
lw = lr.ReadLogs(ReadConfig{Follow: true})
|
||||
for _, x := range testMsg {
|
||||
|
||||
@ -165,7 +165,7 @@ func (s *journald) Close() error {
|
||||
s.mu.Lock()
|
||||
s.closed = true
|
||||
for reader := range s.readers.readers {
|
||||
reader.Close()
|
||||
reader.ProducerGone()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
@ -299,7 +299,7 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,
|
||||
// Wait until we're told to stop.
|
||||
select {
|
||||
case cursor = <-newCursor:
|
||||
case <-logWatcher.WatchClose():
|
||||
case <-logWatcher.WatchConsumerGone():
|
||||
// Notify the other goroutine that its work is done.
|
||||
C.close(pfd[1])
|
||||
cursor = <-newCursor
|
||||
|
||||
@ -166,13 +166,14 @@ func ValidateLogOpt(cfg map[string]string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes underlying file and signals all readers to stop.
|
||||
// Close closes underlying file and signals all the readers
|
||||
// that the logs producer is gone.
|
||||
func (l *JSONFileLogger) Close() error {
|
||||
l.mu.Lock()
|
||||
l.closed = true
|
||||
err := l.writer.Close()
|
||||
for r := range l.readers {
|
||||
r.Close()
|
||||
r.ProducerGone()
|
||||
delete(l.readers, r)
|
||||
}
|
||||
l.mu.Unlock()
|
||||
|
||||
@ -50,11 +50,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
|
||||
}()
|
||||
|
||||
lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
|
||||
watchClose := lw.WatchClose()
|
||||
for {
|
||||
select {
|
||||
case <-lw.Msg:
|
||||
case <-watchClose:
|
||||
case <-lw.WatchProducerGone():
|
||||
return
|
||||
case err := <-chError:
|
||||
if err != nil {
|
||||
|
||||
@ -166,7 +166,7 @@ func (d *driver) Close() error {
|
||||
d.closed = true
|
||||
err := d.logfile.Close()
|
||||
for r := range d.readers {
|
||||
r.Close()
|
||||
r.ProducerGone()
|
||||
delete(d.readers, r)
|
||||
}
|
||||
d.mu.Unlock()
|
||||
|
||||
@ -104,33 +104,50 @@ type LogWatcher struct {
|
||||
// For sending log messages to a reader.
|
||||
Msg chan *Message
|
||||
// For sending error messages that occur while while reading logs.
|
||||
Err chan error
|
||||
closeOnce sync.Once
|
||||
closeNotifier chan struct{}
|
||||
Err chan error
|
||||
producerOnce sync.Once
|
||||
producerGone chan struct{}
|
||||
consumerOnce sync.Once
|
||||
consumerGone chan struct{}
|
||||
}
|
||||
|
||||
// NewLogWatcher returns a new LogWatcher.
|
||||
func NewLogWatcher() *LogWatcher {
|
||||
return &LogWatcher{
|
||||
Msg: make(chan *Message, logWatcherBufferSize),
|
||||
Err: make(chan error, 1),
|
||||
closeNotifier: make(chan struct{}),
|
||||
Msg: make(chan *Message, logWatcherBufferSize),
|
||||
Err: make(chan error, 1),
|
||||
producerGone: make(chan struct{}),
|
||||
consumerGone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Close notifies the underlying log reader to stop.
|
||||
func (w *LogWatcher) Close() {
|
||||
// ProducerGone notifies the underlying log reader that
|
||||
// the logs producer (a container) is gone.
|
||||
func (w *LogWatcher) ProducerGone() {
|
||||
// only close if not already closed
|
||||
w.closeOnce.Do(func() {
|
||||
close(w.closeNotifier)
|
||||
w.producerOnce.Do(func() {
|
||||
close(w.producerGone)
|
||||
})
|
||||
}
|
||||
|
||||
// WatchClose returns a channel receiver that receives notification
|
||||
// when the watcher has been closed. This should only be called from
|
||||
// one goroutine.
|
||||
func (w *LogWatcher) WatchClose() <-chan struct{} {
|
||||
return w.closeNotifier
|
||||
// WatchProducerGone returns a channel receiver that receives notification
|
||||
// once the logs producer (a container) is gone.
|
||||
func (w *LogWatcher) WatchProducerGone() <-chan struct{} {
|
||||
return w.producerGone
|
||||
}
|
||||
|
||||
// ConsumerGone notifies that the logs consumer is gone.
|
||||
func (w *LogWatcher) ConsumerGone() {
|
||||
// only close if not already closed
|
||||
w.consumerOnce.Do(func() {
|
||||
close(w.consumerGone)
|
||||
})
|
||||
}
|
||||
|
||||
// WatchConsumerGone returns a channel receiver that receives notification
|
||||
// when the log watcher consumer is gone.
|
||||
func (w *LogWatcher) WatchConsumerGone() <-chan struct{} {
|
||||
return w.consumerGone
|
||||
}
|
||||
|
||||
// Capability defines the list of capabilities that a driver can implement
|
||||
|
||||
@ -488,7 +488,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-watcher.WatchClose():
|
||||
case <-watcher.WatchConsumerGone():
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
@ -546,22 +546,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
||||
}
|
||||
defer func() {
|
||||
f.Close()
|
||||
fileWatcher.Remove(name)
|
||||
fileWatcher.Close()
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-logWatcher.WatchClose():
|
||||
fileWatcher.Remove(name)
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
var retries int
|
||||
handleRotate := func() error {
|
||||
f.Close()
|
||||
@ -596,7 +583,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
||||
case fsnotify.Rename, fsnotify.Remove:
|
||||
select {
|
||||
case <-notifyRotate:
|
||||
case <-ctx.Done():
|
||||
case <-logWatcher.WatchProducerGone():
|
||||
return errDone
|
||||
case <-logWatcher.WatchConsumerGone():
|
||||
return errDone
|
||||
}
|
||||
if err := handleRotate(); err != nil {
|
||||
@ -618,7 +607,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
||||
return errRetry
|
||||
}
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
case <-logWatcher.WatchProducerGone():
|
||||
return errDone
|
||||
case <-logWatcher.WatchConsumerGone():
|
||||
return errDone
|
||||
}
|
||||
}
|
||||
@ -664,23 +655,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
// send the message, unless the consumer is gone
|
||||
select {
|
||||
case logWatcher.Msg <- msg:
|
||||
case <-ctx.Done():
|
||||
logWatcher.Msg <- msg
|
||||
for {
|
||||
msg, err := decodeLogLine()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !since.IsZero() && msg.Timestamp.Before(since) {
|
||||
continue
|
||||
}
|
||||
if !until.IsZero() && msg.Timestamp.After(until) {
|
||||
return
|
||||
}
|
||||
logWatcher.Msg <- msg
|
||||
}
|
||||
case <-logWatcher.WatchConsumerGone():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -74,3 +76,128 @@ func TestTailFiles(t *testing.T) {
|
||||
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFollowLogsConsumerGone(t *testing.T) {
|
||||
lw := logger.NewLogWatcher()
|
||||
|
||||
f, err := ioutil.TempFile("", t.Name())
|
||||
assert.NilError(t, err)
|
||||
defer func() {
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
}()
|
||||
|
||||
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
|
||||
return func() (*logger.Message, error) {
|
||||
return &logger.Message{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
followLogsDone := make(chan struct{})
|
||||
var since, until time.Time
|
||||
go func() {
|
||||
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
|
||||
close(followLogsDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-lw.Msg:
|
||||
case err := <-lw.Err:
|
||||
assert.NilError(t, err)
|
||||
case <-followLogsDone:
|
||||
t.Fatal("follow logs finished unexpectedly")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
lw.ConsumerGone()
|
||||
select {
|
||||
case <-followLogsDone:
|
||||
case <-time.After(20 * time.Second):
|
||||
t.Fatal("timeout waiting for followLogs() to finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFollowLogsProducerGone(t *testing.T) {
|
||||
lw := logger.NewLogWatcher()
|
||||
|
||||
f, err := ioutil.TempFile("", t.Name())
|
||||
assert.NilError(t, err)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
var sent, received, closed int
|
||||
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
|
||||
return func() (*logger.Message, error) {
|
||||
if closed == 1 {
|
||||
closed++
|
||||
t.Logf("logDecode() closed after sending %d messages\n", sent)
|
||||
return nil, io.EOF
|
||||
} else if closed > 1 {
|
||||
t.Fatal("logDecode() called after closing!")
|
||||
return nil, io.EOF
|
||||
}
|
||||
sent++
|
||||
return &logger.Message{}, nil
|
||||
}
|
||||
}
|
||||
var since, until time.Time
|
||||
|
||||
followLogsDone := make(chan struct{})
|
||||
go func() {
|
||||
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
|
||||
close(followLogsDone)
|
||||
}()
|
||||
|
||||
// read 1 message
|
||||
select {
|
||||
case <-lw.Msg:
|
||||
received++
|
||||
case err := <-lw.Err:
|
||||
assert.NilError(t, err)
|
||||
case <-followLogsDone:
|
||||
t.Fatal("followLogs() finished unexpectedly")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for log message")
|
||||
}
|
||||
|
||||
// "stop" the "container"
|
||||
closed = 1
|
||||
lw.ProducerGone()
|
||||
|
||||
// should receive all the messages sent
|
||||
readDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(readDone)
|
||||
for {
|
||||
select {
|
||||
case <-lw.Msg:
|
||||
received++
|
||||
if received == sent {
|
||||
return
|
||||
}
|
||||
case err := <-lw.Err:
|
||||
assert.NilError(t, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-readDone:
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
|
||||
}
|
||||
|
||||
t.Logf("messages sent: %d, received: %d", sent, received)
|
||||
|
||||
// followLogs() should be done by now
|
||||
select {
|
||||
case <-followLogsDone:
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("timeout waiting for followLogs() to finish")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-lw.WatchConsumerGone():
|
||||
t.Fatal("consumer should not have exited")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,14 +110,16 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
|
||||
}
|
||||
}()
|
||||
}
|
||||
// set up some defers
|
||||
defer logs.Close()
|
||||
// signal that the log reader is gone
|
||||
defer logs.ConsumerGone()
|
||||
|
||||
// close the messages channel. closing is the only way to signal above
|
||||
// that we're doing with logs (other than context cancel i guess).
|
||||
defer close(messageChan)
|
||||
|
||||
lg.Debug("begin logs")
|
||||
defer lg.Debugf("end logs (%v)", ctx.Err())
|
||||
|
||||
for {
|
||||
select {
|
||||
// i do not believe as the system is currently designed any error
|
||||
@ -132,14 +134,12 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
|
||||
return
|
||||
case msg, ok := <-logs.Msg:
|
||||
// there is some kind of pool or ring buffer in the logger that
|
||||
// produces these messages, and a possible future optimization
|
||||
// might be to use that pool and reuse message objects
|
||||
if !ok {
|
||||
lg.Debug("end logs")
|
||||
return
|
||||
}
|
||||
m := msg.AsLogMessage() // just a pointer conversion, does not copy data
|
||||
|
||||
@ -54,6 +54,7 @@ func (w *filePoller) Add(name string) error {
|
||||
}
|
||||
fi, err := os.Stat(name)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
@ -61,6 +62,7 @@ func (w *filePoller) Add(name string) error {
|
||||
w.watches = make(map[string]chan struct{})
|
||||
}
|
||||
if _, exists := w.watches[name]; exists {
|
||||
f.Close()
|
||||
return fmt.Errorf("watch exists")
|
||||
}
|
||||
chClose := make(chan struct{})
|
||||
@ -113,11 +115,10 @@ func (w *filePoller) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
w.closed = true
|
||||
for name := range w.watches {
|
||||
w.remove(name)
|
||||
delete(w.watches, name)
|
||||
}
|
||||
w.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -146,12 +147,11 @@ func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
|
||||
func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
|
||||
defer f.Close()
|
||||
for {
|
||||
time.Sleep(watchWaitTime)
|
||||
select {
|
||||
case <-time.After(watchWaitTime):
|
||||
case <-chClose:
|
||||
logrus.Debugf("watch for %s closed", f.Name())
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
fi, err := os.Stat(f.Name())
|
||||
|
||||
Reference in New Issue
Block a user