Merge pull request #27169 from tonistiigi/move-restartmanager

move restartmanager out of libcontainerd
Upstream-commit: 058556d7fc0e7b33f53523069e796ccf1e22229f
Component: engine
This commit is contained in:
Alexander Morozov
2016-10-07 13:41:12 -07:00
committed by GitHub
17 changed files with 107 additions and 216 deletions

View File

@ -292,9 +292,7 @@ func (container *Container) GetRootResourcePath(path string) (string, error) {
// ExitOnNext signals to the monitor that it should not restart the container
// after we send the kill signal.
func (container *Container) ExitOnNext() {
if container.restartManager != nil {
container.restartManager.Cancel()
}
container.RestartManager().Cancel()
}
// HostConfigPath returns the path to the container's JSON hostconfig
@ -545,7 +543,7 @@ func copyEscapable(dst io.Writer, src io.ReadCloser, keys []byte) (written int64
// ShouldRestart decides whether the daemon should restart the container or not.
// This is based on the container's restart policy.
func (container *Container) ShouldRestart() bool {
shouldRestart, _, _ := container.restartManager.ShouldRestart(uint32(container.ExitCode()), container.HasBeenManuallyStopped, container.FinishedAt.Sub(container.StartedAt))
shouldRestart, _, _ := container.RestartManager().ShouldRestart(uint32(container.ExitCode()), container.HasBeenManuallyStopped, container.FinishedAt.Sub(container.StartedAt))
return shouldRestart
}
@ -941,7 +939,7 @@ func (container *Container) UpdateMonitor(restartPolicy containertypes.RestartPo
SetPolicy(containertypes.RestartPolicy)
}
if rm, ok := container.RestartManager(false).(policySetter); ok {
if rm, ok := container.RestartManager().(policySetter); ok {
rm.SetPolicy(restartPolicy)
}
}
@ -956,18 +954,24 @@ func (container *Container) FullHostname() string {
}
// RestartManager returns the current restartmanager instance connected to container.
func (container *Container) RestartManager(reset bool) restartmanager.RestartManager {
if reset {
container.RestartCount = 0
container.restartManager = nil
}
func (container *Container) RestartManager() restartmanager.RestartManager {
if container.restartManager == nil {
container.restartManager = restartmanager.New(container.HostConfig.RestartPolicy, container.RestartCount)
}
return container.restartManager
}
// ResetRestartManager initializes new restartmanager based on container config
func (container *Container) ResetRestartManager(resetCount bool) {
if container.restartManager != nil {
container.restartManager.Cancel()
}
if resetCount {
container.RestartCount = 0
}
container.restartManager = nil
}
type attachContext struct {
ctx context.Context
cancel context.CancelFunc

View File

@ -189,12 +189,13 @@ func (daemon *Daemon) restore() error {
logrus.Errorf("Failed to migrate old mounts to use new spec format")
}
rm := c.RestartManager(false)
if c.IsRunning() || c.IsPaused() {
if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil {
c.RestartManager().Cancel() // manually start containers because some need to wait for swarm networking
if err := daemon.containerd.Restore(c.ID); err != nil {
logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
return
}
c.ResetRestartManager(false)
if !c.HostConfig.NetworkMode.IsContainer() && c.IsRunning() {
options, err := daemon.buildSandboxOptions(c)
if err != nil {
@ -300,7 +301,7 @@ func (daemon *Daemon) restore() error {
// Make sure networks are available before starting
daemon.waitForNetworks(c)
if err := daemon.containerStart(c, ""); err != nil {
if err := daemon.containerStart(c, "", true); err != nil {
logrus.Errorf("Failed to start container %s: %s", c.ID, err)
}
close(chNotify)
@ -372,7 +373,7 @@ func (daemon *Daemon) RestartSwarmContainers() {
group.Add(1)
go func(c *container.Container) {
defer group.Done()
if err := daemon.containerStart(c, ""); err != nil {
if err := daemon.containerStart(c, "", true); err != nil {
logrus.Error(err)
}
}(c)

View File

@ -6,11 +6,13 @@ import (
"io"
"runtime"
"strconv"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/restartmanager"
"github.com/docker/docker/runconfig"
)
@ -31,43 +33,57 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
daemon.LogContainerEvent(c, "oom")
case libcontainerd.StateExit:
// if container's AutoRemove flag is set, remove it after clean up
if c.HostConfig.AutoRemove {
defer func() {
autoRemove := func() {
if c.HostConfig.AutoRemove {
if err := daemon.ContainerRm(c.ID, &types.ContainerRmConfig{ForceRemove: true, RemoveVolume: true}); err != nil {
logrus.Errorf("can't remove container %s: %v", c.ID, err)
}
}()
}
}
c.Lock()
defer c.Unlock()
c.Wait()
c.Reset(false)
c.SetStopped(platformConstructExitStatus(e))
restart, wait, err := c.RestartManager().ShouldRestart(e.ExitCode, false, time.Since(c.StartedAt))
if err == nil && restart {
c.RestartCount++
c.SetRestarting(platformConstructExitStatus(e))
} else {
c.SetStopped(platformConstructExitStatus(e))
defer autoRemove()
}
daemon.updateHealthMonitor(c)
attributes := map[string]string{
"exitCode": strconv.Itoa(int(e.ExitCode)),
}
daemon.updateHealthMonitor(c)
daemon.LogContainerEventWithAttributes(c, "die", attributes)
daemon.Cleanup(c)
// FIXME: here is race condition between two RUN instructions in Dockerfile
// because they share same runconfig and change image. Must be fixed
// in builder/builder.go
if err == nil && restart {
go func() {
err := <-wait
if err == nil {
if err = daemon.containerStart(c, "", false); err != nil {
logrus.Debugf("failed to restart contianer: %+v", err)
}
}
if err != nil {
c.SetStopped(platformConstructExitStatus(e))
defer autoRemove()
if err != restartmanager.ErrRestartCanceled {
logrus.Errorf("restartmanger wait error: %+v", err)
}
}
}()
}
defer c.Unlock()
if err := c.ToDisk(); err != nil {
return err
}
return daemon.postRunProcessing(c, e)
case libcontainerd.StateRestart:
c.Lock()
defer c.Unlock()
c.Reset(false)
c.RestartCount++
c.SetRestarting(platformConstructExitStatus(e))
attributes := map[string]string{
"exitCode": strconv.Itoa(int(e.ExitCode)),
}
daemon.LogContainerEventWithAttributes(c, "die", attributes)
daemon.updateHealthMonitor(c)
return c.ToDisk()
case libcontainerd.StateExitProcess:
c.Lock()
defer c.Unlock()

View File

@ -32,7 +32,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon
}
if copts != nil {
newOpts = append(newOpts, *copts...)
newOpts = append(newOpts, copts...)
}
// Create a new servicing container, which will start, complete the update, and merge back the

View File

@ -56,7 +56,7 @@ func (daemon *Daemon) containerRestart(container *container.Container, seconds i
}
}
if err := daemon.containerStart(container, ""); err != nil {
if err := daemon.containerStart(container, "", true); err != nil {
return err
}

View File

@ -14,7 +14,6 @@ import (
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/container"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/runconfig"
)
@ -78,23 +77,23 @@ func (daemon *Daemon) ContainerStart(name string, hostConfig *containertypes.Hos
return err
}
return daemon.containerStart(container, checkpoint)
return daemon.containerStart(container, checkpoint, true)
}
// Start starts a container
func (daemon *Daemon) Start(container *container.Container) error {
return daemon.containerStart(container, "")
return daemon.containerStart(container, "", true)
}
// containerStart prepares the container to run by setting up everything the
// container needs, such as storage and networking, as well as links
// between containers. The container is left waiting for a signal to
// begin running.
func (daemon *Daemon) containerStart(container *container.Container, checkpoint string) (err error) {
func (daemon *Daemon) containerStart(container *container.Container, checkpoint string, resetRestartManager bool) (err error) {
container.Lock()
defer container.Unlock()
if container.Running {
if resetRestartManager && container.Running { // skip this check if already in restarting step and resetRestartManager==false
return nil
}
@ -141,13 +140,13 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
return err
}
createOptions := []libcontainerd.CreateOption{libcontainerd.WithRestartManager(container.RestartManager(true))}
copts, err := daemon.getLibcontainerdCreateOptions(container)
createOptions, err := daemon.getLibcontainerdCreateOptions(container)
if err != nil {
return err
}
if copts != nil {
createOptions = append(createOptions, *copts...)
if resetRestartManager {
container.ResetRestartManager(true)
}
if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, createOptions...); err != nil {

View File

@ -7,7 +7,7 @@ import (
"github.com/docker/docker/libcontainerd"
)
func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Container) (*[]libcontainerd.CreateOption, error) {
func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Container) ([]libcontainerd.CreateOption, error) {
createOptions := []libcontainerd.CreateOption{}
// Ensure a runtime has been assigned to this container
@ -25,5 +25,5 @@ func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Contain
}
createOptions = append(createOptions, libcontainerd.WithRuntime(rt.Path, rt.Args))
return &createOptions, nil
return createOptions, nil
}

View File

@ -17,7 +17,7 @@ const (
credentialSpecFileLocation = "CredentialSpecs"
)
func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Container) (*[]libcontainerd.CreateOption, error) {
func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Container) ([]libcontainerd.CreateOption, error) {
createOptions := []libcontainerd.CreateOption{}
// Are we going to run as a Hyper-V container?
@ -139,7 +139,7 @@ func (daemon *Daemon) getLibcontainerdCreateOptions(container *container.Contain
createOptions = append(createOptions, &libcontainerd.NetworkEndpointsOption{Endpoints: epList, AllowUnqualifiedDNSQuery: AllowUnqualifiedDNSQuery})
}
return &createOptions, nil
return createOptions, nil
}
// getCredentialSpec is a helper function to get the value of a credential spec supplied

View File

@ -138,13 +138,8 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
clnt.lock(containerID)
defer clnt.unlock(containerID)
if ctr, err := clnt.getContainer(containerID); err == nil {
if ctr.restarting {
ctr.restartManager.Cancel()
ctr.clean()
} else {
return fmt.Errorf("Container %s is already active", containerID)
}
if _, err := clnt.getContainer(containerID); err == nil {
return fmt.Errorf("Container %s is already active", containerID)
}
uid, gid, err := getRootIDs(specs.Spec(spec))

View File

@ -1,12 +1,5 @@
package libcontainerd
import (
"fmt"
"time"
"github.com/docker/docker/restartmanager"
)
const (
// InitFriendlyName is the name given in the lookup map of processes
// for the first process started in a container.
@ -16,25 +9,5 @@ const (
type containerCommon struct {
process
restartManager restartmanager.RestartManager
restarting bool
processes map[string]*process
startedAt time.Time
}
// WithRestartManager sets the restartmanager to be used with the container.
func WithRestartManager(rm restartmanager.RestartManager) CreateOption {
return restartManager{rm}
}
type restartManager struct {
rm restartmanager.RestartManager
}
func (rm restartManager) Apply(p interface{}) error {
if pr, ok := p.(*container); ok {
pr.restartManager = rm.rm
return nil
}
return fmt.Errorf("WithRestartManager option not supported for this client")
processes map[string]*process
}

View File

@ -7,12 +7,10 @@ import (
"os"
"path/filepath"
"syscall"
"time"
"github.com/Sirupsen/logrus"
containerd "github.com/docker/containerd/api/grpc/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/restartmanager"
"github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context"
)
@ -137,7 +135,6 @@ func (ctr *container) start(checkpoint string, checkpointDir string) error {
ctr.closeFifos(iopipe)
return err
}
ctr.startedAt = time.Now()
ctr.systemPid = systemPid(resp.Container)
close(createChan)
@ -164,7 +161,6 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
defer ctr.client.unlock(ctr.containerID)
switch e.Type {
case StateExit, StatePause, StateResume, StateOOM:
var waitRestart chan error
st := StateInfo{
CommonStateInfo: CommonStateInfo{
State: e.Type,
@ -179,20 +175,8 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
st.ProcessID = e.Pid
st.State = StateExitProcess
}
if st.State == StateExit && ctr.restartManager != nil {
restart, wait, err := ctr.restartManager.ShouldRestart(e.Status, false, time.Since(ctr.startedAt))
if err != nil {
logrus.Warnf("libcontainerd: container %s %v", ctr.containerID, err)
} else if restart {
st.State = StateRestart
ctr.restarting = true
ctr.client.deleteContainer(e.Id)
waitRestart = wait
}
}
// Remove process from list if we have exited
// We need to do so here in case the Message Handler decides to restart it.
switch st.State {
case StateExit:
ctr.clean()
@ -204,32 +188,6 @@ func (ctr *container) handleEvent(e *containerd.Event) error {
if err := ctr.client.backend.StateChanged(e.Id, st); err != nil {
logrus.Errorf("libcontainerd: backend.StateChanged(): %v", err)
}
if st.State == StateRestart {
go func() {
err := <-waitRestart
ctr.client.lock(ctr.containerID)
defer ctr.client.unlock(ctr.containerID)
ctr.restarting = false
if err == nil {
if err = ctr.start("", ""); err != nil {
logrus.Errorf("libcontainerd: error restarting %v", err)
}
}
if err != nil {
st.State = StateExit
ctr.clean()
ctr.client.q.append(e.Id, func() {
if err := ctr.client.backend.StateChanged(e.Id, st); err != nil {
logrus.Errorf("libcontainerd: %v", err)
}
})
if err != restartmanager.ErrRestartCanceled {
logrus.Errorf("libcontainerd: %v", err)
}
}
}()
}
if e.Type == StatePause || e.Type == StateResume {
ctr.pauseMonitor.handle(e.Type)
}

View File

@ -91,7 +91,6 @@ func (ctr *container) start() error {
}
return err
}
ctr.startedAt = time.Now()
pid := newProcess.Pid()
@ -194,7 +193,6 @@ func (ctr *container) waitProcessExitCode(process *process) int {
// equivalent to (in the linux containerd world) where events come in for
// state change notifications from containerd.
func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) error {
var waitRestart chan error
logrus.Debugln("libcontainerd: waitExit() on pid", process.systemPid)
exitCode := ctr.waitProcessExitCode(process)
@ -234,20 +232,7 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
logrus.Error(err)
}
if !ctr.manualStopRequested && ctr.restartManager != nil {
restart, wait, err := ctr.restartManager.ShouldRestart(uint32(exitCode), false, time.Since(ctr.startedAt))
if err != nil {
logrus.Error(err)
} else if restart {
si.State = StateRestart
ctr.restarting = true
ctr.client.deleteContainer(ctr.containerID)
waitRestart = wait
}
}
// Remove process from list if we have exited
// We need to do so here in case the Message Handler decides to restart it.
if si.State == StateExit {
ctr.client.deleteContainer(ctr.containerID)
}
@ -268,24 +253,6 @@ func (ctr *container) waitExit(process *process, isFirstProcessToStart bool) err
logrus.Debugf("libcontainerd: waitExit() completed OK, %+v", si)
if si.State == StateRestart {
go func() {
err := <-waitRestart
ctr.restarting = false
if err == nil {
if err = ctr.client.Create(ctr.containerID, "", "", ctr.ociSpec, ctr.options...); err != nil {
logrus.Errorf("libcontainerd: error restarting %v", err)
}
}
if err != nil {
si.State = StateExit
if err := ctr.client.backend.StateChanged(ctr.containerID, si); err != nil {
logrus.Error(err)
}
}
}()
}
return nil
}

View File

@ -13,7 +13,6 @@ const (
StatePause = "pause"
StateResume = "resume"
StateExit = "exit"
StateRestart = "restart"
StateRestore = "restore"
StateStartProcess = "start-process"
StateExitProcess = "exit-process"

View File

@ -32,14 +32,12 @@ type eventLogger func(id, name, action string)
// Manager controls the plugin subsystem.
type Manager struct {
sync.RWMutex
libRoot string
runRoot string
pluginStore *store.Store
containerdClient libcontainerd.Client
registryService registry.Service
liveRestore bool
shutdown bool
pluginEventLogger eventLogger
}
@ -83,17 +81,20 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
switch e.State {
case libcontainerd.StateExit:
var shutdown bool
pm.RLock()
shutdown = pm.shutdown
pm.RUnlock()
if shutdown {
p, err := pm.pluginStore.GetByID(id)
if err != nil {
return err
}
p, err := pm.pluginStore.GetByID(id)
if err != nil {
return err
}
p.RLock()
if p.ExitChan != nil {
close(p.ExitChan)
}
restart := p.Restart
p.RUnlock()
p.RemoveFromDisk()
if restart {
pm.enable(p, true)
}
}
return nil

View File

@ -9,12 +9,9 @@ import (
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/oci"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/plugin/v2"
"github.com/docker/docker/restartmanager"
"github.com/opencontainers/runtime-spec/specs-go"
)
@ -26,20 +23,18 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
if err != nil {
return err
}
p.RestartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), libcontainerd.WithRestartManager(p.RestartManager)); err != nil {
if err := p.RestartManager.Cancel(); err != nil {
logrus.Errorf("enable: restartManager.Cancel failed due to %v", err)
}
p.Lock()
p.Restart = true
p.Unlock()
if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec)); err != nil {
return err
}
p.PClient, err = plugins.NewClient("unix://"+filepath.Join(p.RuntimeSourcePath, p.GetSocket()), nil)
if err != nil {
if err := p.RestartManager.Cancel(); err != nil {
logrus.Errorf("enable: restartManager.Cancel failed due to %v", err)
}
p.Lock()
p.Restart = false
p.Unlock()
return err
}
@ -50,49 +45,37 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
}
func (pm *Manager) restore(p *v2.Plugin) error {
p.RestartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
return pm.containerdClient.Restore(p.GetID(), libcontainerd.WithRestartManager(p.RestartManager))
return pm.containerdClient.Restore(p.GetID())
}
func (pm *Manager) disable(p *v2.Plugin) error {
if !p.IsEnabled() {
return fmt.Errorf("plugin %s is already disabled", p.Name())
}
if err := p.RestartManager.Cancel(); err != nil {
logrus.Error(err)
}
p.Lock()
p.Restart = false
p.Unlock()
if err := pm.containerdClient.Signal(p.GetID(), int(syscall.SIGKILL)); err != nil {
logrus.Error(err)
}
if err := p.RemoveFromDisk(); err != nil {
logrus.Error(err)
}
pm.pluginStore.SetState(p, false)
return nil
}
// Shutdown stops all plugins and called during daemon shutdown.
func (pm *Manager) Shutdown() {
pm.Lock()
pm.shutdown = true
pm.Unlock()
pm.RLock()
defer pm.RUnlock()
plugins := pm.pluginStore.GetAll()
for _, p := range plugins {
if pm.liveRestore && p.IsEnabled() {
logrus.Debug("Plugin active when liveRestore is set, skipping shutdown")
continue
}
if p.RestartManager != nil {
if err := p.RestartManager.Cancel(); err != nil {
logrus.Error(err)
}
}
if pm.containerdClient != nil && p.IsEnabled() {
pluginID := p.GetID()
p.Lock()
p.ExitChan = make(chan bool)
p.Restart = false
p.Unlock()
err := pm.containerdClient.Signal(p.PluginObj.ID, int(syscall.SIGTERM))
if err != nil {
logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
@ -108,8 +91,5 @@ func (pm *Manager) Shutdown() {
}
}
}
if err := p.RemoveFromDisk(); err != nil {
logrus.Errorf("Remove plugin runtime failed with error: %v", err)
}
}
}

View File

@ -111,13 +111,12 @@ func (ps *Store) Add(p *v2.Plugin) {
ps.Unlock()
}
// Remove removes a plugin from memory, plugindb and disk.
// Remove removes a plugin from memory and plugindb.
func (ps *Store) Remove(p *v2.Plugin) {
ps.Lock()
delete(ps.plugins, p.GetID())
delete(ps.nameToID, p.Name())
ps.updatePluginDB()
p.RemoveFromDisk()
ps.Unlock()
}

View File

@ -5,16 +5,15 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/restartmanager"
)
// Plugin represents an individual plugin.
type Plugin struct {
sync.RWMutex
PluginObj types.Plugin `json:"plugin"`
PClient *plugins.Client `json:"-"`
RestartManager restartmanager.RestartManager `json:"-"`
RuntimeSourcePath string `json:"-"`
ExitChan chan bool `json:"-"`
RefCount int `json:"-"`
PluginObj types.Plugin `json:"plugin"`
PClient *plugins.Client `json:"-"`
RuntimeSourcePath string `json:"-"`
RefCount int `json:"-"`
Restart bool `json:"-"`
ExitChan chan bool `json:"-"`
}