Files
docker-cli/components/engine/libcontainerd/remote_unix.go
Antonio Murdaca 13833c9420 libcontainerd: fix reaper goroutine position
It has observed defunct containerd processes accumulating over
time while dockerd was permanently failing to restart containerd.
Due to a bug in the runContainerdDaemon() function, dockerd does not clean up
its child process if containerd already exits very soon after the (re)start.

The reproducer and analysis below comes from docker 1.12.x but bug
still applies on latest master.

- from libcontainerd/remote_linux.go:

  329 func (r *remote) runContainerdDaemon() error {
   :
   :      // start the containerd child process
   :
  403     if err := cmd.Start(); err != nil {
  404             return err
  405     }
   :
   :      // If containerd exits very soon after (re)start, it is
possible
   :      // that containerd is already in defunct state at the time
when
   :      // dockerd gets here. The setOOMScore() function tries to
write
   :      // to /proc/PID_OF_CONTAINERD/oom_score_adj. However, this
fails
   :      // with errno EINVAL because containerd is defunct. Please see
   :      // snippets of kernel source code and further explanation
below.
   :
  407     if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil
{
  408             utils.KillProcess(cmd.Process.Pid)
   :
   :              // Due to the error from write() we return here. As
the
   :              // goroutine that would clean up the child has not
been
   :              // started yet, containerd remains in the defunct
state
   :              // and never gets reaped.
   :
  409             return err
  410     }
   :
  417     go func() {
  418             cmd.Wait()
  419             close(r.daemonWaitCh)
  420     }() // Reap our child when needed
   :
  423 }

This is the kernel function that gets invoked when dockerd tries to
write
to /proc/PID_OF_CONTAINERD/oom_score_adj.

- from fs/proc/base.c:

 1197 static ssize_t oom_score_adj_write(struct file *file, ...
 1198                                         size_t count, loff_t
*ppos)
 1199 {
   :
 1223         task = get_proc_task(file_inode(file));
   :
   :          // The defunct containerd process does not have a virtual
   :          // address space anymore, i.e. task->mm is NULL. Thus the
   :          // following code returns errno EINVAL to dockerd.
   :
 1230         if (!task->mm) {
 1231                 err = -EINVAL;
 1232                 goto err_task_lock;
 1233         }
   :
 1253 err_task_lock:
   :
 1257         return err < 0 ? err : count;
 1258 }

The purpose of the following program is to demonstrate the behavior of
the oom_score_adj_write() function in connection with a defunct process.

$ cat defunct_test.c

\#include <unistd.h>

main()
{
    pid_t pid = fork();

    if (pid == 0)
        // child
        _exit(0);

    // parent
    pause();
}

$ make defunct_test
cc     defunct_test.c   -o defunct_test

$ ./defunct_test &
[1] 3142

$ ps -f | grep defunct_test | grep -v grep
root      3142  2956  0 13:04 pts/0    00:00:00 ./defunct_test
root      3143  3142  0 13:04 pts/0    00:00:00 [defunct_test] <defunct>

$ echo "ps 3143" | crash -s
  PID    PPID  CPU       TASK        ST  %MEM     VSZ    RSS  COMM
  3143   3142   2  ffff880035def300  ZO   0.0       0      0
defunct_test

$ echo "px ((struct task_struct *)0xffff880035def300)->mm" | crash -s
$1 = (struct mm_struct *) 0x0
                          ^^^ task->mm is NULL

$ cat /proc/3143/oom_score_adj
0

$ echo 0 > /proc/3143/oom_score_adj
-bash: echo: write error: Invalid argument"

---

This patch fixes the above issue by making sure we start the reaper
goroutine as soon as possible.

Signed-off-by: Antonio Murdaca <runcom@redhat.com>
Upstream-commit: 27087eacbf96e6ef9d48a6d3dc89c7c1cff155b4
Component: engine
2017-05-27 15:13:59 +02:00

551 lines
13 KiB
Go

// +build linux solaris
package libcontainerd
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"os/exec"
"path/filepath"
goruntime "runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/containerd/containerd/api/grpc/types"
"github.com/docker/docker/pkg/locker"
"github.com/docker/docker/pkg/system"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/transport"
)
const (
maxConnectionRetryCount = 3
containerdHealthCheckTimeout = 3 * time.Second
containerdShutdownTimeout = 15 * time.Second
containerdBinary = "docker-containerd"
containerdPidFilename = "docker-containerd.pid"
containerdSockFilename = "docker-containerd.sock"
containerdStateDir = "containerd"
eventTimestampFilename = "event.ts"
)
type remote struct {
sync.RWMutex
apiClient containerd.APIClient
daemonPid int
stateDir string
rpcAddr string
startDaemon bool
closeManually bool
debugLog bool
rpcConn *grpc.ClientConn
clients []*client
eventTsPath string
runtime string
runtimeArgs []string
daemonWaitCh chan struct{}
liveRestore bool
oomScore int
restoreFromTimestamp *timestamp.Timestamp
}
// New creates a fresh instance of libcontainerd remote.
func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specified the correct address. Got error: %v", err)
}
}()
r := &remote{
stateDir: stateDir,
daemonPid: -1,
eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
}
for _, option := range options {
if err := option.Apply(r); err != nil {
return nil, err
}
}
if err := system.MkdirAll(stateDir, 0700); err != nil {
return nil, err
}
if r.rpcAddr == "" {
r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
}
if r.startDaemon {
if err := r.runContainerdDaemon(); err != nil {
return nil, err
}
}
// don't output the grpc reconnect logging
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
if err != nil {
return nil, fmt.Errorf("error connecting to containerd: %v", err)
}
r.rpcConn = conn
r.apiClient = containerd.NewAPIClient(conn)
// Get the timestamp to restore from
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
}
r.restoreFromTimestamp = tsp
go r.handleConnectionChange()
if err := r.startEventsMonitor(); err != nil {
return nil, err
}
return r, nil
}
func (r *remote) UpdateOptions(options ...RemoteOption) error {
for _, option := range options {
if err := option.Apply(r); err != nil {
return err
}
}
return nil
}
func (r *remote) handleConnectionChange() {
var transientFailureCount = 0
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
healthClient := grpc_health_v1.NewHealthClient(r.rpcConn)
for {
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), containerdHealthCheckTimeout)
_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
cancel()
if err == nil {
continue
}
logrus.Debugf("libcontainerd: containerd health check returned error: %v", err)
if r.daemonPid != -1 {
if r.closeManually {
// Well, we asked for it to stop, just return
return
}
// all other errors are transient
// Reset state to be notified of next failure
transientFailureCount++
if transientFailureCount >= maxConnectionRetryCount {
transientFailureCount = 0
if system.IsProcessAlive(r.daemonPid) {
system.KillProcess(r.daemonPid)
}
<-r.daemonWaitCh
if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error
logrus.Errorf("libcontainerd: error restarting containerd: %v", err)
}
continue
}
}
}
}
func (r *remote) Cleanup() {
if r.daemonPid == -1 {
return
}
r.closeManually = true
r.rpcConn.Close()
// Ask the daemon to quit
syscall.Kill(r.daemonPid, syscall.SIGTERM)
// Wait up to 15secs for it to stop
for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
if !system.IsProcessAlive(r.daemonPid) {
break
}
time.Sleep(time.Second)
}
if system.IsProcessAlive(r.daemonPid) {
logrus.Warnf("libcontainerd: containerd (%d) didn't stop within 15 secs, killing it\n", r.daemonPid)
syscall.Kill(r.daemonPid, syscall.SIGKILL)
}
// cleanup some files
os.Remove(filepath.Join(r.stateDir, containerdPidFilename))
os.Remove(filepath.Join(r.stateDir, containerdSockFilename))
}
func (r *remote) Client(b Backend) (Client, error) {
c := &client{
clientCommon: clientCommon{
backend: b,
containers: make(map[string]*container),
locker: locker.New(),
},
remote: r,
exitNotifiers: make(map[string]*exitNotifier),
liveRestore: r.liveRestore,
}
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
}
func (r *remote) updateEventTimestamp(t time.Time) {
f, err := os.OpenFile(r.eventTsPath, syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC, 0600)
if err != nil {
logrus.Warnf("libcontainerd: failed to open event timestamp file: %v", err)
return
}
defer f.Close()
b, err := t.MarshalText()
if err != nil {
logrus.Warnf("libcontainerd: failed to encode timestamp: %v", err)
return
}
n, err := f.Write(b)
if err != nil || n != len(b) {
logrus.Warnf("libcontainerd: failed to update event timestamp file: %v", err)
f.Truncate(0)
return
}
}
func (r *remote) getLastEventTimestamp() time.Time {
t := time.Now()
fi, err := os.Stat(r.eventTsPath)
if os.IsNotExist(err) || fi.Size() == 0 {
return t
}
f, err := os.Open(r.eventTsPath)
if err != nil {
logrus.Warnf("libcontainerd: Unable to access last event ts: %v", err)
return t
}
defer f.Close()
b := make([]byte, fi.Size())
n, err := f.Read(b)
if err != nil || n != len(b) {
logrus.Warnf("libcontainerd: Unable to read last event ts: %v", err)
return t
}
t.UnmarshalText(b)
return t
}
func (r *remote) startEventsMonitor() error {
// First, get past events
t := r.getLastEventTimestamp()
tsp, err := ptypes.TimestampProto(t)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err)
}
er := &containerd.EventsRequest{
Timestamp: tsp,
}
events, err := r.apiClient.Events(context.Background(), er, grpc.FailFast(false))
if err != nil {
return err
}
go r.handleEventStream(events)
return nil
}
func (r *remote) handleEventStream(events containerd.API_EventsClient) {
for {
e, err := events.Recv()
if err != nil {
if grpc.ErrorDesc(err) == transport.ErrConnClosing.Desc &&
r.closeManually {
// ignore error if grpc remote connection is closed manually
return
}
logrus.Errorf("libcontainerd: failed to receive event from containerd: %v", err)
go r.startEventsMonitor()
return
}
logrus.Debugf("libcontainerd: received containerd event: %#v", e)
var container *container
var c *client
r.RLock()
for _, c = range r.clients {
container, err = c.getContainer(e.Id)
if err == nil {
break
}
}
r.RUnlock()
if container == nil {
logrus.Warnf("libcontainerd: unknown container %s", e.Id)
continue
}
if err := container.handleEvent(e); err != nil {
logrus.Errorf("libcontainerd: error processing state change for %s: %v", e.Id, err)
}
tsp, err := ptypes.Timestamp(e.Timestamp)
if err != nil {
logrus.Errorf("libcontainerd: failed to convert event timestamp: %q", err)
continue
}
r.updateEventTimestamp(tsp)
}
}
func (r *remote) runContainerdDaemon() error {
pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()
// File exist, check if the daemon is alive
b := make([]byte, 8)
n, err := f.Read(b)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
if err != nil {
return err
}
if system.IsProcessAlive(int(pid)) {
logrus.Infof("libcontainerd: previous instance of containerd still alive (%d)", pid)
r.daemonPid = int(pid)
return nil
}
}
// rewind the file
_, err = f.Seek(0, os.SEEK_SET)
if err != nil {
return err
}
// Truncate it
err = f.Truncate(0)
if err != nil {
return err
}
// Start a new instance
args := []string{
"-l", fmt.Sprintf("unix://%s", r.rpcAddr),
"--metrics-interval=0",
"--start-timeout", "2m",
"--state-dir", filepath.Join(r.stateDir, containerdStateDir),
}
if goruntime.GOOS == "solaris" {
args = append(args, "--shim", "containerd-shim", "--runtime", "runc")
} else {
args = append(args, "--shim", "docker-containerd-shim")
if r.runtime != "" {
args = append(args, "--runtime")
args = append(args, r.runtime)
}
}
if r.debugLog {
args = append(args, "--debug")
}
if len(r.runtimeArgs) > 0 {
for _, v := range r.runtimeArgs {
args = append(args, "--runtime-args")
args = append(args, v)
}
logrus.Debugf("libcontainerd: runContainerdDaemon: runtimeArgs: %s", args)
}
cmd := exec.Command(containerdBinary, args...)
// redirect containerd logs to docker logs
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = setSysProcAttr(true)
cmd.Env = nil
// clear the NOTIFY_SOCKET from the env when starting containerd
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
}
if err := cmd.Start(); err != nil {
return err
}
// unless strictly necessary, do not add anything in between here
// as the reaper goroutine below needs to kick in as soon as possible
// and any "return" from code paths added here will defeat the reaper
// process.
r.daemonWaitCh = make(chan struct{})
go func() {
cmd.Wait()
close(r.daemonWaitCh)
}() // Reap our child when needed
logrus.Infof("libcontainerd: new containerd process, pid: %d", cmd.Process.Pid)
if err := setOOMScore(cmd.Process.Pid, r.oomScore); err != nil {
system.KillProcess(cmd.Process.Pid)
return err
}
if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
system.KillProcess(cmd.Process.Pid)
return err
}
r.daemonPid = cmd.Process.Pid
return nil
}
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr(addr string) RemoteOption {
return rpcAddr(addr)
}
type rpcAddr string
func (a rpcAddr) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.rpcAddr = string(a)
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
// WithRuntimePath sets the path of the runtime to be used as the
// default by containerd
func WithRuntimePath(rt string) RemoteOption {
return runtimePath(rt)
}
type runtimePath string
func (rt runtimePath) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.runtime = string(rt)
return nil
}
return fmt.Errorf("WithRuntime option not supported for this remote")
}
// WithRuntimeArgs sets the list of runtime args passed to containerd
func WithRuntimeArgs(args []string) RemoteOption {
return runtimeArgs(args)
}
type runtimeArgs []string
func (rt runtimeArgs) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.runtimeArgs = rt
return nil
}
return fmt.Errorf("WithRuntimeArgs option not supported for this remote")
}
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
func WithStartDaemon(start bool) RemoteOption {
return startDaemon(start)
}
type startDaemon bool
func (s startDaemon) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.startDaemon = bool(s)
return nil
}
return fmt.Errorf("WithStartDaemon option not supported for this remote")
}
// WithDebugLog defines if containerd debug logs will be enabled for daemon.
func WithDebugLog(debug bool) RemoteOption {
return debugLog(debug)
}
type debugLog bool
func (d debugLog) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.debugLog = bool(d)
return nil
}
return fmt.Errorf("WithDebugLog option not supported for this remote")
}
// WithLiveRestore defines if containers are stopped on shutdown or restored.
func WithLiveRestore(v bool) RemoteOption {
return liveRestore(v)
}
type liveRestore bool
func (l liveRestore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.liveRestore = bool(l)
for _, c := range remote.clients {
c.liveRestore = bool(l)
}
return nil
}
return fmt.Errorf("WithLiveRestore option not supported for this remote")
}
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore(score int) RemoteOption {
return oomScore(score)
}
type oomScore int
func (o oomScore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.oomScore = int(o)
return nil
}
return fmt.Errorf("WithOOMScore option not supported for this remote")
}