From cb0c1a12c4b2d386b008d2206b31fff5e888358a Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 14 Jul 2017 16:45:32 -0400 Subject: [PATCH] Decouple plugin manager from libcontainerd package libcontainerd has a bunch of platform dependent code and huge interfaces that are a pain implement. To make the plugin manager a bit easier to work with, extract the plugin executor into an interface and move the containerd implementation to a separate package. Signed-off-by: Brian Goff Upstream-commit: c85e8622a4813d7b72d74517faa03ab5de4c4550 Component: engine --- components/engine/daemon/daemon.go | 7 +- .../integration-cli/fixtures/plugin/plugin.go | 149 ++++++++++++++++ .../fixtures/plugin/plugin_linux.go | 162 ------------------ .../fixtures/plugin/plugin_unsuported.go | 19 -- .../plugin/executor/containerd/containerd.go | 77 +++++++++ components/engine/plugin/manager.go | 112 ++++++------ components/engine/plugin/manager_linux.go | 25 +-- 7 files changed, 298 insertions(+), 253 deletions(-) delete mode 100644 components/engine/integration-cli/fixtures/plugin/plugin_linux.go delete mode 100644 components/engine/integration-cli/fixtures/plugin/plugin_unsuported.go create mode 100644 components/engine/plugin/executor/containerd/containerd.go diff --git a/components/engine/daemon/daemon.go b/components/engine/daemon/daemon.go index 7208f3c5c4..19d22bf702 100644 --- a/components/engine/daemon/daemon.go +++ b/components/engine/daemon/daemon.go @@ -48,6 +48,7 @@ import ( "github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/truncindex" "github.com/docker/docker/plugin" + pluginexec "github.com/docker/docker/plugin/executor/containerd" refstore "github.com/docker/docker/reference" "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" @@ -646,12 +647,16 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe } registerMetricsPluginCallback(d.PluginStore, metricsSockPath) + createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) { + return pluginexec.New(containerdRemote, m) + } + // Plugin system initialization should happen before restore. Do not change order. d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{ Root: filepath.Join(config.Root, "plugins"), ExecRoot: getPluginExecRoot(config.Root), Store: d.PluginStore, - Executor: containerdRemote, + CreateExecutor: createPluginExec, RegistryService: registryService, LiveRestoreEnabled: config.LiveRestoreEnabled, LogPluginEvent: d.LogPluginEvent, // todo: make private diff --git a/components/engine/integration-cli/fixtures/plugin/plugin.go b/components/engine/integration-cli/fixtures/plugin/plugin.go index 4ab15c23de..0b13134563 100644 --- a/components/engine/integration-cli/fixtures/plugin/plugin.go +++ b/components/engine/integration-cli/fixtures/plugin/plugin.go @@ -1,9 +1,19 @@ package plugin import ( + "encoding/json" "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "time" "github.com/docker/docker/api/types" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/plugin" + "github.com/docker/docker/registry" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -32,3 +42,142 @@ func WithBinary(bin string) CreateOpt { type CreateClient interface { PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error } + +// Create creates a new plugin with the specified name +func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error { + tmpDir, err := ioutil.TempDir("", "create-test-plugin") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + tar, err := makePluginBundle(tmpDir, opts...) + if err != nil { + return err + } + defer tar.Close() + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name}) +} + +// CreateInRegistry makes a plugin (locally) and pushes it to a registry. +// This does not use a dockerd instance to create or push the plugin. +// If you just want to create a plugin in some daemon, use `Create`. +// +// This can be useful when testing plugins on swarm where you don't really want +// the plugin to exist on any of the daemons (immediately) and there needs to be +// some way to distribute the plugin. +func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error { + tmpDir, err := ioutil.TempDir("", "create-test-plugin-local") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + inPath := filepath.Join(tmpDir, "plugin") + if err := os.MkdirAll(inPath, 0755); err != nil { + return errors.Wrap(err, "error creating plugin root") + } + + tar, err := makePluginBundle(inPath, opts...) + if err != nil { + return err + } + defer tar.Close() + + dummyExec := func(m *plugin.Manager) (plugin.Executor, error) { + return nil, nil + } + + regService, err := registry.NewService(registry.ServiceOptions{V2Only: true}) + if err != nil { + return err + } + + managerConfig := plugin.ManagerConfig{ + Store: plugin.NewStore(), + RegistryService: regService, + Root: filepath.Join(tmpDir, "root"), + ExecRoot: "/run/docker", // manager init fails if not set + CreateExecutor: dummyExec, + LogPluginEvent: func(id, name, action string) {}, // panics when not set + } + manager, err := plugin.NewManager(managerConfig) + if err != nil { + return errors.Wrap(err, "error creating plugin manager") + } + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil { + return err + } + + if auth == nil { + auth = &types.AuthConfig{} + } + err = manager.Push(ctx, repo, nil, auth, ioutil.Discard) + return errors.Wrap(err, "error pushing plugin") +} + +func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) { + p := &types.PluginConfig{ + Interface: types.PluginConfigInterface{ + Socket: "basic.sock", + Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}}, + }, + Entrypoint: []string{"/basic"}, + } + cfg := &Config{ + PluginConfig: p, + } + for _, o := range opts { + o(cfg) + } + if cfg.binPath == "" { + binPath, err := ensureBasicPluginBin() + if err != nil { + return nil, err + } + cfg.binPath = binPath + } + + configJSON, err := json.Marshal(p) + if err != nil { + return nil, err + } + if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil { + return nil, err + } + if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil { + return nil, errors.Wrap(err, "error creating plugin rootfs dir") + } + if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil { + return nil, errors.Wrap(err, "error copying plugin binary to rootfs path") + } + tar, err := archive.Tar(inPath, archive.Uncompressed) + return tar, errors.Wrap(err, "error making plugin archive") +} + +func ensureBasicPluginBin() (string, error) { + name := "docker-basic-plugin" + p, err := exec.LookPath(name) + if err == nil { + return p, nil + } + + goBin, err := exec.LookPath("go") + if err != nil { + return "", err + } + installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name) + cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic")) + cmd.Env = append(cmd.Env, "CGO_ENABLED=0") + if out, err := cmd.CombinedOutput(); err != nil { + return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out)) + } + return installPath, nil +} diff --git a/components/engine/integration-cli/fixtures/plugin/plugin_linux.go b/components/engine/integration-cli/fixtures/plugin/plugin_linux.go deleted file mode 100644 index 5da79fcb77..0000000000 --- a/components/engine/integration-cli/fixtures/plugin/plugin_linux.go +++ /dev/null @@ -1,162 +0,0 @@ -package plugin - -import ( - "encoding/json" - "io" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/libcontainerd" - "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/plugin" - "github.com/docker/docker/registry" - "github.com/pkg/errors" - "golang.org/x/net/context" -) - -// Create creates a new plugin with the specified name -func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error { - tmpDir, err := ioutil.TempDir("", "create-test-plugin") - if err != nil { - return err - } - defer os.RemoveAll(tmpDir) - - tar, err := makePluginBundle(tmpDir, opts...) - if err != nil { - return err - } - defer tar.Close() - - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name}) -} - -// TODO(@cpuguy83): we really shouldn't have to do this... -// The manager panics on init when `Executor` is not set. -type dummyExecutor struct{} - -func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil } -func (dummyExecutor) Cleanup() {} -func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error { return nil } - -// CreateInRegistry makes a plugin (locally) and pushes it to a registry. -// This does not use a dockerd instance to create or push the plugin. -// If you just want to create a plugin in some daemon, use `Create`. -// -// This can be useful when testing plugins on swarm where you don't really want -// the plugin to exist on any of the daemons (immediately) and there needs to be -// some way to distribute the plugin. -func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error { - tmpDir, err := ioutil.TempDir("", "create-test-plugin-local") - if err != nil { - return err - } - defer os.RemoveAll(tmpDir) - - inPath := filepath.Join(tmpDir, "plugin") - if err := os.MkdirAll(inPath, 0755); err != nil { - return errors.Wrap(err, "error creating plugin root") - } - - tar, err := makePluginBundle(inPath, opts...) - if err != nil { - return err - } - defer tar.Close() - - regService, err := registry.NewService(registry.ServiceOptions{V2Only: true}) - if err != nil { - return err - } - - managerConfig := plugin.ManagerConfig{ - Store: plugin.NewStore(), - RegistryService: regService, - Root: filepath.Join(tmpDir, "root"), - ExecRoot: "/run/docker", // manager init fails if not set - Executor: dummyExecutor{}, - LogPluginEvent: func(id, name, action string) {}, // panics when not set - } - manager, err := plugin.NewManager(managerConfig) - if err != nil { - return errors.Wrap(err, "error creating plugin manager") - } - - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil { - return err - } - - if auth == nil { - auth = &types.AuthConfig{} - } - err = manager.Push(ctx, repo, nil, auth, ioutil.Discard) - return errors.Wrap(err, "error pushing plugin") -} - -func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) { - p := &types.PluginConfig{ - Interface: types.PluginConfigInterface{ - Socket: "basic.sock", - Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}}, - }, - Entrypoint: []string{"/basic"}, - } - cfg := &Config{ - PluginConfig: p, - } - for _, o := range opts { - o(cfg) - } - if cfg.binPath == "" { - binPath, err := ensureBasicPluginBin() - if err != nil { - return nil, err - } - cfg.binPath = binPath - } - - configJSON, err := json.Marshal(p) - if err != nil { - return nil, err - } - if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil { - return nil, err - } - if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil { - return nil, errors.Wrap(err, "error creating plugin rootfs dir") - } - if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil { - return nil, errors.Wrap(err, "error copying plugin binary to rootfs path") - } - tar, err := archive.Tar(inPath, archive.Uncompressed) - return tar, errors.Wrap(err, "error making plugin archive") -} - -func ensureBasicPluginBin() (string, error) { - name := "docker-basic-plugin" - p, err := exec.LookPath(name) - if err == nil { - return p, nil - } - - goBin, err := exec.LookPath("go") - if err != nil { - return "", err - } - installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name) - cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic")) - cmd.Env = append(cmd.Env, "CGO_ENABLED=0") - if out, err := cmd.CombinedOutput(); err != nil { - return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out)) - } - return installPath, nil -} diff --git a/components/engine/integration-cli/fixtures/plugin/plugin_unsuported.go b/components/engine/integration-cli/fixtures/plugin/plugin_unsuported.go deleted file mode 100644 index 7c272a317f..0000000000 --- a/components/engine/integration-cli/fixtures/plugin/plugin_unsuported.go +++ /dev/null @@ -1,19 +0,0 @@ -// +build !linux - -package plugin - -import ( - "github.com/docker/docker/api/types" - "github.com/pkg/errors" - "golang.org/x/net/context" -) - -// Create is not supported on this platform -func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error { - return errors.New("not supported on this platform") -} - -// CreateInRegistry is not supported on this platform -func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error { - return errors.New("not supported on this platform") -} diff --git a/components/engine/plugin/executor/containerd/containerd.go b/components/engine/plugin/executor/containerd/containerd.go new file mode 100644 index 0000000000..74cf530cf1 --- /dev/null +++ b/components/engine/plugin/executor/containerd/containerd.go @@ -0,0 +1,77 @@ +package containerd + +import ( + "io" + + "github.com/docker/docker/libcontainerd" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +// ExitHandler represents an object that is called when the exit event is received from containerd +type ExitHandler interface { + HandleExitEvent(id string) error +} + +// New creates a new containerd plugin executor +func New(remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) { + e := &Executor{exitHandler: exitHandler} + client, err := remote.Client(e) + if err != nil { + return nil, errors.Wrap(err, "error creating containerd exec client") + } + e.client = client + return e, nil +} + +// Executor is the containerd client implementation of a plugin executor +type Executor struct { + client libcontainerd.Client + exitHandler ExitHandler +} + +// Create creates a new container +func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error { + return e.client.Create(id, "", "", spec, attachStreamsFunc(stdout, stderr)) +} + +// Restore restores a container +func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error { + return e.client.Restore(id, attachStreamsFunc(stdout, stderr)) +} + +// IsRunning returns if the container with the given id is running +func (e *Executor) IsRunning(id string) (bool, error) { + pids, err := e.client.GetPidsForContainer(id) + return len(pids) > 0, err +} + +// Signal sends the specified signal to the container +func (e *Executor) Signal(id string, signal int) error { + return e.client.Signal(id, signal) +} + +// StateChanged handles state changes from containerd +// All events are ignored except the exit event, which is sent of to the stored handler +func (e *Executor) StateChanged(id string, event libcontainerd.StateInfo) error { + switch event.State { + case libcontainerd.StateExit: + return e.exitHandler.HandleExitEvent(id) + } + return nil +} + +func attachStreamsFunc(stdout, stderr io.WriteCloser) func(libcontainerd.IOPipe) error { + return func(iop libcontainerd.IOPipe) error { + iop.Stdin.Close() + go func() { + io.Copy(stdout, iop.Stdout) + stdout.Close() + }() + go func() { + io.Copy(stderr, iop.Stderr) + stderr.Close() + }() + return nil + } +} diff --git a/components/engine/plugin/manager.go b/components/engine/plugin/manager.go index 2281dfdd6c..0c03192d76 100644 --- a/components/engine/plugin/manager.go +++ b/components/engine/plugin/manager.go @@ -17,7 +17,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/image" "github.com/docker/docker/layer" - "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/mount" @@ -26,6 +25,7 @@ import ( "github.com/docker/docker/plugin/v2" "github.com/docker/docker/registry" "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -35,6 +35,14 @@ const rootFSFileName = "rootfs" var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`) +// Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins +type Executor interface { + Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error + Restore(id string, stdout, stderr io.WriteCloser) error + IsRunning(id string) (bool, error) + Signal(id string, signal int) error +} + func (pm *Manager) restorePlugin(p *v2.Plugin) error { if p.IsEnabled() { return pm.restore(p) @@ -47,24 +55,27 @@ type eventLogger func(id, name, action string) // ManagerConfig defines configuration needed to start new manager. type ManagerConfig struct { Store *Store // remove - Executor libcontainerd.Remote RegistryService registry.Service LiveRestoreEnabled bool // TODO: remove LogPluginEvent eventLogger Root string ExecRoot string + CreateExecutor ExecutorCreator AuthzMiddleware *authorization.Middleware } +// ExecutorCreator is used in the manager config to pass in an `Executor` +type ExecutorCreator func(*Manager) (Executor, error) + // Manager controls the plugin subsystem. type Manager struct { - config ManagerConfig - mu sync.RWMutex // protects cMap - muGC sync.RWMutex // protects blobstore deletions - cMap map[*v2.Plugin]*controller - containerdClient libcontainerd.Client - blobStore *basicBlobStore - publisher *pubsub.Publisher + config ManagerConfig + mu sync.RWMutex // protects cMap + muGC sync.RWMutex // protects blobstore deletions + cMap map[*v2.Plugin]*controller + blobStore *basicBlobStore + publisher *pubsub.Publisher + executor Executor } // controller represents the manager's control on a plugin. @@ -111,10 +122,11 @@ func NewManager(config ManagerConfig) (*Manager, error) { } var err error - manager.containerdClient, err = config.Executor.Client(manager) // todo: move to another struct + manager.executor, err = config.CreateExecutor(manager) if err != nil { - return nil, errors.Wrap(err, "failed to create containerd client") + return nil, err } + manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs")) if err != nil { return nil, err @@ -133,42 +145,37 @@ func (pm *Manager) tmpDir() string { return filepath.Join(pm.config.Root, "tmp") } -// StateChanged updates plugin internals using libcontainerd events. -func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error { - logrus.Debugf("plugin state changed %s %#v", id, e) +// HandleExitEvent is called when the executor receives the exit event +// In the future we may change this, but for now all we care about is the exit event. +func (pm *Manager) HandleExitEvent(id string) error { + p, err := pm.config.Store.GetV2Plugin(id) + if err != nil { + return err + } - switch e.State { - case libcontainerd.StateExit: - p, err := pm.config.Store.GetV2Plugin(id) - if err != nil { - return err + os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)) + + if p.PropagatedMount != "" { + if err := mount.Unmount(p.PropagatedMount); err != nil { + logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err) } - - os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)) - - if p.PropagatedMount != "" { - if err := mount.Unmount(p.PropagatedMount); err != nil { - logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err) - } - propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount") - if err := mount.Unmount(propRoot); err != nil { - logrus.Warn("Could not unmount %s: %v", propRoot, err) - } - } - - pm.mu.RLock() - c := pm.cMap[p] - if c.exitChan != nil { - close(c.exitChan) - } - restart := c.restart - pm.mu.RUnlock() - - if restart { - pm.enable(p, c, true) + propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount") + if err := mount.Unmount(propRoot); err != nil { + logrus.Warn("Could not unmount %s: %v", propRoot, err) } } + pm.mu.RLock() + c := pm.cMap[p] + if c.exitChan != nil { + close(c.exitChan) + } + restart := c.restart + pm.mu.RUnlock() + + if restart { + pm.enable(p, c, true) + } return nil } @@ -333,23 +340,10 @@ func (l logHook) Fire(entry *logrus.Entry) error { return nil } -func attachToLog(id string) func(libcontainerd.IOPipe) error { - return func(iop libcontainerd.IOPipe) error { - iop.Stdin.Close() - - logger := logrus.New() - logger.Hooks.Add(logHook{id}) - // TODO: cache writer per id - w := logger.Writer() - go func() { - io.Copy(w, iop.Stdout) - }() - go func() { - // TODO: update logrus and use logger.WriterLevel - io.Copy(w, iop.Stderr) - }() - return nil - } +func makeLoggerStreams(id string) (stdout, stderr io.WriteCloser) { + logger := logrus.New() + logger.Hooks.Add(logHook{id}) + return logger.WriterLevel(logrus.InfoLevel), logger.WriterLevel(logrus.ErrorLevel) } func validatePrivileges(requiredPrivileges, privileges types.PluginPrivileges) error { diff --git a/components/engine/plugin/manager_linux.go b/components/engine/plugin/manager_linux.go index 7c832b55b2..beefc3dfba 100644 --- a/components/engine/plugin/manager_linux.go +++ b/components/engine/plugin/manager_linux.go @@ -11,7 +11,6 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/daemon/initlayer" - "github.com/docker/docker/libcontainerd" "github.com/docker/docker/pkg/containerfs" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/mount" @@ -63,7 +62,8 @@ func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error { return errors.WithStack(err) } - if err := pm.containerdClient.Create(p.GetID(), "", "", *spec, attachToLog(p.GetID())); err != nil { + stdout, stderr := makeLoggerStreams(p.GetID()) + if err := pm.executor.Create(p.GetID(), *spec, stdout, stderr); err != nil { if p.PropagatedMount != "" { if err := mount.Unmount(p.PropagatedMount); err != nil { logrus.Warnf("Could not unmount %s: %v", p.PropagatedMount, err) @@ -83,7 +83,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { client, err := plugins.NewClientWithTimeout("unix://"+sockAddr, nil, time.Duration(c.timeoutInSecs)*time.Second) if err != nil { c.restart = false - shutdownPlugin(p, c, pm.containerdClient) + shutdownPlugin(p, c, pm.executor) return errors.WithStack(err) } @@ -109,7 +109,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { c.restart = false // While restoring plugins, we need to explicitly set the state to disabled pm.config.Store.SetState(p, false) - shutdownPlugin(p, c, pm.containerdClient) + shutdownPlugin(p, c, pm.executor) return err } @@ -121,13 +121,14 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error { } func (pm *Manager) restore(p *v2.Plugin) error { - if err := pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID())); err != nil { + stdout, stderr := makeLoggerStreams(p.GetID()) + if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil { return err } if pm.config.LiveRestoreEnabled { c := &controller{} - if pids, _ := pm.containerdClient.GetPidsForContainer(p.GetID()); len(pids) == 0 { + if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning { // plugin is not running, so follow normal startup procedure return pm.enable(p, c, true) } @@ -143,10 +144,10 @@ func (pm *Manager) restore(p *v2.Plugin) error { return nil } -func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) { +func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) { pluginID := p.GetID() - err := containerdClient.Signal(pluginID, int(unix.SIGTERM)) + err := executor.Signal(pluginID, int(unix.SIGTERM)) if err != nil { logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err) } else { @@ -155,7 +156,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd. logrus.Debug("Clean shutdown of plugin") case <-time.After(time.Second * 10): logrus.Debug("Force shutdown plugin") - if err := containerdClient.Signal(pluginID, int(unix.SIGKILL)); err != nil { + if err := executor.Signal(pluginID, int(unix.SIGKILL)); err != nil { logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err) } } @@ -175,7 +176,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error { } c.restart = false - shutdownPlugin(p, c, pm.containerdClient) + shutdownPlugin(p, c, pm.executor) pm.config.Store.SetState(p, false) return pm.save(p) } @@ -192,9 +193,9 @@ func (pm *Manager) Shutdown() { logrus.Debug("Plugin active when liveRestore is set, skipping shutdown") continue } - if pm.containerdClient != nil && p.IsEnabled() { + if pm.executor != nil && p.IsEnabled() { c.restart = false - shutdownPlugin(p, c, pm.containerdClient) + shutdownPlugin(p, c, pm.executor) } } mount.Unmount(pm.config.Root)