Add context.RequestID to event stream

This PR adds a "request ID" to each event generated, the 'docker events'
stream now looks like this:

```
2015-09-10T15:02:50.000000000-07:00 [reqid: c01e3534ddca] de7c5d4ca927253cf4e978ee9c4545161e406e9b5a14617efb52c658b249174a: (from ubuntu) create
```
Note the `[reqID: c01e3534ddca]` part, that's new.

Each HTTP request will generate its own unique ID. So, if you do a
`docker build` you'll see a series of events all with the same reqID.
This allow for log processing tools to determine which events are all related
to the same http request.

I didn't propigate the context to all possible funcs in the daemon,
I decided to just do the ones that needed it in order to get the reqID
into the events. I'd like to have people review this direction first, and
if we're ok with it then I'll make sure we're consistent about when
we pass around the context - IOW, make sure that all funcs at the same level
have a context passed in even if they don't call the log funcs - this will
ensure we're consistent w/o passing it around for all calls unnecessarily.

ping @icecrime @calavera @crosbymichael

Signed-off-by: Doug Davis <dug@us.ibm.com>
Upstream-commit: 26b1064967d9fcefd4c35f60e96bf6d7c9a3b5f8
Component: engine
This commit is contained in:
Doug Davis
2015-09-10 15:01:18 -07:00
parent 65bd47ac3a
commit bf44c732da
68 changed files with 737 additions and 565 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/opencontainers/runc/libcontainer/label"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/context"
"github.com/docker/docker/daemon/execdriver"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
@ -170,9 +171,10 @@ func (container *Container) writeHostConfig() error {
return ioutil.WriteFile(pth, data, 0666)
}
func (container *Container) logEvent(action string) {
func (container *Container) logEvent(ctx context.Context, action string) {
d := container.daemon
d.EventsService.Log(
ctx,
action,
container.ID,
container.Config.Image,
@ -238,7 +240,7 @@ func (container *Container) exportContainerRw() (archive.Archive, error) {
// container needs, such as storage and networking, as well as links
// between containers. The container is left waiting for a signal to
// begin running.
func (container *Container) Start() (err error) {
func (container *Container) Start(ctx context.Context) (err error) {
container.Lock()
defer container.Unlock()
@ -260,12 +262,12 @@ func (container *Container) Start() (err error) {
container.ExitCode = 128
}
container.toDisk()
container.cleanup()
container.logEvent("die")
container.cleanup(ctx)
container.logEvent(ctx, "die")
}
}()
if err := container.Mount(); err != nil {
if err := container.Mount(ctx); err != nil {
return err
}
@ -273,10 +275,10 @@ func (container *Container) Start() (err error) {
// backwards API compatibility.
container.hostConfig = runconfig.SetDefaultNetModeIfBlank(container.hostConfig)
if err := container.initializeNetworking(); err != nil {
if err := container.initializeNetworking(ctx); err != nil {
return err
}
linkedEnv, err := container.setupLinkedContainers()
linkedEnv, err := container.setupLinkedContainers(ctx)
if err != nil {
return err
}
@ -284,7 +286,7 @@ func (container *Container) Start() (err error) {
return err
}
env := container.createDaemonEnvironment(linkedEnv)
if err := populateCommand(container, env); err != nil {
if err := populateCommand(ctx, container, env); err != nil {
return err
}
@ -301,7 +303,7 @@ func (container *Container) Start() (err error) {
mounts = append(mounts, container.ipcMounts()...)
container.command.Mounts = mounts
return container.waitForStart()
return container.waitForStart(ctx)
}
// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
@ -334,14 +336,14 @@ func (container *Container) isNetworkAllocated() bool {
// cleanup releases any network resources allocated to the container along with any rules
// around how containers are linked together. It also unmounts the container's root filesystem.
func (container *Container) cleanup() {
func (container *Container) cleanup(ctx context.Context) {
container.releaseNetwork()
if err := container.unmountIpcMounts(); err != nil {
logrus.Errorf("%s: Failed to umount ipc filesystems: %v", container.ID, err)
}
if err := container.Unmount(); err != nil {
if err := container.Unmount(ctx); err != nil {
logrus.Errorf("%s: Failed to umount filesystem: %v", container.ID, err)
}
@ -357,7 +359,7 @@ func (container *Container) cleanup() {
// to send the signal. An error is returned if the container is paused
// or not running, or if there is a problem returned from the
// underlying kill command.
func (container *Container) killSig(sig int) error {
func (container *Container) killSig(ctx context.Context, sig int) error {
logrus.Debugf("Sending %d to %s", sig, container.ID)
container.Lock()
defer container.Unlock()
@ -385,13 +387,13 @@ func (container *Container) killSig(sig int) error {
if err := container.daemon.kill(container, sig); err != nil {
return err
}
container.logEvent("kill")
container.logEvent(ctx, "kill")
return nil
}
// Wrapper aroung killSig() suppressing "no such process" error.
func (container *Container) killPossiblyDeadProcess(sig int) error {
err := container.killSig(sig)
func (container *Container) killPossiblyDeadProcess(ctx context.Context, sig int) error {
err := container.killSig(ctx, sig)
if err == syscall.ESRCH {
logrus.Debugf("Cannot kill process (pid=%d) with signal %d: no such process.", container.getPID(), sig)
return nil
@ -399,7 +401,7 @@ func (container *Container) killPossiblyDeadProcess(sig int) error {
return err
}
func (container *Container) pause() error {
func (container *Container) pause(ctx context.Context) error {
container.Lock()
defer container.Unlock()
@ -417,11 +419,11 @@ func (container *Container) pause() error {
return err
}
container.Paused = true
container.logEvent("pause")
container.logEvent(ctx, "pause")
return nil
}
func (container *Container) unpause() error {
func (container *Container) unpause(ctx context.Context) error {
container.Lock()
defer container.Unlock()
@ -439,18 +441,18 @@ func (container *Container) unpause() error {
return err
}
container.Paused = false
container.logEvent("unpause")
container.logEvent(ctx, "unpause")
return nil
}
// Kill forcefully terminates a container.
func (container *Container) Kill() error {
func (container *Container) Kill(ctx context.Context) error {
if !container.IsRunning() {
return derr.ErrorCodeNotRunning.WithArgs(container.ID)
}
// 1. Send SIGKILL
if err := container.killPossiblyDeadProcess(int(syscall.SIGKILL)); err != nil {
if err := container.killPossiblyDeadProcess(ctx, int(syscall.SIGKILL)); err != nil {
// While normally we might "return err" here we're not going to
// because if we can't stop the container by this point then
// its probably because its already stopped. Meaning, between
@ -484,15 +486,15 @@ func (container *Container) Kill() error {
// process to exit. If a negative duration is given, Stop will wait
// for the initial signal forever. If the container is not running Stop returns
// immediately.
func (container *Container) Stop(seconds int) error {
func (container *Container) Stop(ctx context.Context, seconds int) error {
if !container.IsRunning() {
return nil
}
// 1. Send a SIGTERM
if err := container.killPossiblyDeadProcess(container.stopSignal()); err != nil {
if err := container.killPossiblyDeadProcess(ctx, container.stopSignal()); err != nil {
logrus.Infof("Failed to send SIGTERM to the process, force killing")
if err := container.killPossiblyDeadProcess(9); err != nil {
if err := container.killPossiblyDeadProcess(ctx, 9); err != nil {
return err
}
}
@ -501,13 +503,13 @@ func (container *Container) Stop(seconds int) error {
if _, err := container.WaitStop(time.Duration(seconds) * time.Second); err != nil {
logrus.Infof("Container %v failed to exit within %d seconds of SIGTERM - using the force", container.ID, seconds)
// 3. If it doesn't, then send SIGKILL
if err := container.Kill(); err != nil {
if err := container.Kill(ctx); err != nil {
container.WaitStop(-1 * time.Second)
return err
}
}
container.logEvent("stop")
container.logEvent(ctx, "stop")
return nil
}
@ -515,61 +517,61 @@ func (container *Container) Stop(seconds int) error {
// container. When stopping, wait for the given duration in seconds to
// gracefully stop, before forcefully terminating the container. If
// given a negative duration, wait forever for a graceful stop.
func (container *Container) Restart(seconds int) error {
func (container *Container) Restart(ctx context.Context, seconds int) error {
// Avoid unnecessarily unmounting and then directly mounting
// the container when the container stops and then starts
// again
if err := container.Mount(); err == nil {
defer container.Unmount()
if err := container.Mount(ctx); err == nil {
defer container.Unmount(ctx)
}
if err := container.Stop(seconds); err != nil {
if err := container.Stop(ctx, seconds); err != nil {
return err
}
if err := container.Start(); err != nil {
if err := container.Start(ctx); err != nil {
return err
}
container.logEvent("restart")
container.logEvent(ctx, "restart")
return nil
}
// Resize changes the TTY of the process running inside the container
// to the given height and width. The container must be running.
func (container *Container) Resize(h, w int) error {
func (container *Container) Resize(ctx context.Context, h, w int) error {
if !container.IsRunning() {
return derr.ErrorCodeNotRunning.WithArgs(container.ID)
}
if err := container.command.ProcessConfig.Terminal.Resize(h, w); err != nil {
return err
}
container.logEvent("resize")
container.logEvent(ctx, "resize")
return nil
}
func (container *Container) export() (archive.Archive, error) {
if err := container.Mount(); err != nil {
func (container *Container) export(ctx context.Context) (archive.Archive, error) {
if err := container.Mount(ctx); err != nil {
return nil, err
}
archive, err := archive.Tar(container.basefs, archive.Uncompressed)
if err != nil {
container.Unmount()
container.Unmount(ctx)
return nil, err
}
arch := ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
container.Unmount()
container.Unmount(ctx)
return err
})
container.logEvent("export")
container.logEvent(ctx, "export")
return arch, err
}
// Mount sets container.basefs
func (container *Container) Mount() error {
return container.daemon.Mount(container)
func (container *Container) Mount(ctx context.Context) error {
return container.daemon.Mount(ctx, container)
}
func (container *Container) changes() ([]archive.Change, error) {
@ -578,7 +580,7 @@ func (container *Container) changes() ([]archive.Change, error) {
return container.daemon.changes(container)
}
func (container *Container) getImage() (*image.Image, error) {
func (container *Container) getImage(ctx context.Context) (*image.Image, error) {
if container.daemon == nil {
return nil, derr.ErrorCodeImageUnregContainer
}
@ -587,7 +589,7 @@ func (container *Container) getImage() (*image.Image, error) {
// Unmount asks the daemon to release the layered filesystems that are
// mounted by the container.
func (container *Container) Unmount() error {
func (container *Container) Unmount(ctx context.Context) error {
return container.daemon.unmount(container)
}
@ -612,7 +614,7 @@ func validateID(id string) error {
return nil
}
func (container *Container) copy(resource string) (rc io.ReadCloser, err error) {
func (container *Container) copy(ctx context.Context, resource string) (rc io.ReadCloser, err error) {
container.Lock()
defer func() {
@ -624,7 +626,7 @@ func (container *Container) copy(resource string) (rc io.ReadCloser, err error)
}
}()
if err := container.Mount(); err != nil {
if err := container.Mount(ctx); err != nil {
return nil, err
}
@ -633,7 +635,7 @@ func (container *Container) copy(resource string) (rc io.ReadCloser, err error)
// unmount any volumes
container.unmountVolumes(true)
// unmount the container's rootfs
container.Unmount()
container.Unmount(ctx)
}
}()
@ -669,11 +671,11 @@ func (container *Container) copy(resource string) (rc io.ReadCloser, err error)
reader := ioutils.NewReadCloserWrapper(archive, func() error {
err := archive.Close()
container.unmountVolumes(true)
container.Unmount()
container.Unmount(ctx)
container.Unlock()
return err
})
container.logEvent("copy")
container.logEvent(ctx, "copy")
return reader, nil
}
@ -752,14 +754,14 @@ func (container *Container) startLogging() error {
return nil
}
func (container *Container) waitForStart() error {
func (container *Container) waitForStart(ctx context.Context) error {
container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy)
// block until we either receive an error from the initial start of the container's
// process or until the process is running in the container
select {
case <-container.monitor.startSignal:
case err := <-promise.Go(container.monitor.Start):
case err := <-promise.Go(func() error { return container.monitor.Start(ctx) }):
return err
}
@ -790,11 +792,11 @@ func (container *Container) getExecIDs() []string {
return container.execCommands.List()
}
func (container *Container) exec(ExecConfig *ExecConfig) error {
func (container *Container) exec(ctx context.Context, ExecConfig *ExecConfig) error {
container.Lock()
defer container.Unlock()
callback := func(processConfig *execdriver.ProcessConfig, pid int, chOOM <-chan struct{}) error {
callback := func(ctx context.Context, processConfig *execdriver.ProcessConfig, pid int, chOOM <-chan struct{}) error {
if processConfig.Tty {
// The callback is called after the process Start()
// so we are in the parent process. In TTY mode, stdin/out/err is the PtySlave
@ -809,7 +811,7 @@ func (container *Container) exec(ExecConfig *ExecConfig) error {
// We use a callback here instead of a goroutine and an chan for
// synchronization purposes
cErr := promise.Go(func() error { return container.monitorExec(ExecConfig, callback) })
cErr := promise.Go(func() error { return container.monitorExec(ctx, ExecConfig, callback) })
// Exec should not return until the process is actually running
select {
@ -821,13 +823,13 @@ func (container *Container) exec(ExecConfig *ExecConfig) error {
return nil
}
func (container *Container) monitorExec(ExecConfig *ExecConfig, callback execdriver.DriverCallback) error {
func (container *Container) monitorExec(ctx context.Context, ExecConfig *ExecConfig, callback execdriver.DriverCallback) error {
var (
err error
exitCode int
)
pipes := execdriver.NewPipes(ExecConfig.streamConfig.stdin, ExecConfig.streamConfig.stdout, ExecConfig.streamConfig.stderr, ExecConfig.OpenStdin)
exitCode, err = container.daemon.Exec(container, ExecConfig, pipes, callback)
exitCode, err = container.daemon.Exec(ctx, container, ExecConfig, pipes, callback)
if err != nil {
logrus.Errorf("Error running command in existing container %s: %s", container.ID, err)
}
@ -860,7 +862,7 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr
return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr)
}
func (container *Container) attachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
func (container *Container) attachWithLogs(ctx context.Context, stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
if logs {
logDriver, err := container.getLogger()
if err != nil {
@ -892,7 +894,7 @@ func (container *Container) attachWithLogs(stdin io.ReadCloser, stdout, stderr i
}
}
container.logEvent("attach")
container.logEvent(ctx, "attach")
//stream
if stream {