Merge component 'engine' from git@github.com:moby/moby master

This commit is contained in:
GordonTheTurtle
2018-01-10 17:07:15 +00:00
56 changed files with 756 additions and 1032 deletions

View File

@ -12,11 +12,11 @@ import (
// Backend is all the methods that need to be implemented
// to provide network specific functionality.
type Backend interface {
FindNetwork(idName string) (libnetwork.Network, error)
FindUniqueNetwork(idName string) (libnetwork.Network, error)
GetNetworks() []libnetwork.Network
CreateNetwork(nc types.NetworkCreateRequest) (*types.NetworkCreateResponse, error)
ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
DisconnectContainerFromNetwork(containerName string, networkName string, force bool) error
DeleteNetwork(name string) error
DeleteNetwork(networkID string) error
NetworksPrune(ctx context.Context, pruneFilters filters.Args) (*types.NetworksPruneReport, error)
}

View File

@ -2,6 +2,7 @@ package network
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
@ -288,7 +289,12 @@ func (n *networkRouter) postNetworkConnect(ctx context.Context, w http.ResponseW
return err
}
return n.backend.ConnectContainerToNetwork(connect.Container, vars["id"], connect.EndpointConfig)
// Always make sure there is no ambiguity with respect to the network ID/name
nw, err := n.backend.FindUniqueNetwork(vars["id"])
if err != nil {
return err
}
return n.backend.ConnectContainerToNetwork(connect.Container, nw.ID(), connect.EndpointConfig)
}
func (n *networkRouter) postNetworkDisconnect(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -312,15 +318,19 @@ func (n *networkRouter) deleteNetwork(ctx context.Context, w http.ResponseWriter
if err := httputils.ParseForm(r); err != nil {
return err
}
if _, err := n.cluster.GetNetwork(vars["id"]); err == nil {
if err = n.cluster.RemoveNetwork(vars["id"]); err != nil {
nw, err := n.findUniqueNetwork(vars["id"])
if err != nil {
return err
}
if nw.Scope == "swarm" {
if err = n.cluster.RemoveNetwork(nw.ID); err != nil {
return err
}
} else {
if err := n.backend.DeleteNetwork(nw.ID); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
return nil
}
if err := n.backend.DeleteNetwork(vars["id"]); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
return nil
@ -518,3 +528,79 @@ func (n *networkRouter) postNetworksPrune(ctx context.Context, w http.ResponseWr
}
return httputils.WriteJSON(w, http.StatusOK, pruneReport)
}
// findUniqueNetwork will search network across different scopes (both local and swarm).
// NOTE: This findUniqueNetwork is different from FindUniqueNetwork in the daemon.
// In case multiple networks have duplicate names, return error.
// First find based on full ID, return immediately once one is found.
// If a network appears both in swarm and local, assume it is in local first
// For full name and partial ID, save the result first, and process later
// in case multiple records was found based on the same term
// TODO (yongtang): should we wrap with version here for backward compatibility?
func (n *networkRouter) findUniqueNetwork(term string) (types.NetworkResource, error) {
listByFullName := map[string]types.NetworkResource{}
listByPartialID := map[string]types.NetworkResource{}
nw := n.backend.GetNetworks()
for _, network := range nw {
if network.ID() == term {
return *n.buildDetailedNetworkResources(network, false), nil
}
if network.Name() == term && !network.Info().Ingress() {
// No need to check the ID collision here as we are still in
// local scope and the network ID is unique in this scope.
listByFullName[network.ID()] = *n.buildDetailedNetworkResources(network, false)
}
if strings.HasPrefix(network.ID(), term) {
// No need to check the ID collision here as we are still in
// local scope and the network ID is unique in this scope.
listByPartialID[network.ID()] = *n.buildDetailedNetworkResources(network, false)
}
}
nr, _ := n.cluster.GetNetworks()
for _, network := range nr {
if network.ID == term {
return network, nil
}
if network.Name == term {
// Check the ID collision as we are in swarm scope here, and
// the map (of the listByFullName) may have already had a
// network with the same ID (from local scope previously)
if _, ok := listByFullName[network.ID]; !ok {
listByFullName[network.ID] = network
}
}
if strings.HasPrefix(network.ID, term) {
// Check the ID collision as we are in swarm scope here, and
// the map (of the listByPartialID) may have already had a
// network with the same ID (from local scope previously)
if _, ok := listByPartialID[network.ID]; !ok {
listByPartialID[network.ID] = network
}
}
}
// Find based on full name, returns true only if no duplicates
if len(listByFullName) == 1 {
for _, v := range listByFullName {
return v, nil
}
}
if len(listByFullName) > 1 {
return types.NetworkResource{}, fmt.Errorf("network %s is ambiguous (%d matches found based on name)", term, len(listByFullName))
}
// Find based on partial ID, returns true only if no duplicates
if len(listByPartialID) == 1 {
for _, v := range listByPartialID {
return v, nil
}
}
if len(listByPartialID) > 1 {
return types.NetworkResource{}, fmt.Errorf("network %s is ambiguous (%d matches found based on ID prefix)", term, len(listByPartialID))
}
return types.NetworkResource{}, libnetwork.ErrNoSuchNetwork(term)
}

View File

@ -67,7 +67,7 @@ func installCommonConfigFlags(conf *config.Config, flags *pflag.FlagSet) {
flags.StringVar(&conf.MetricsAddress, "metrics-addr", "", "Set default address and port to serve the metrics api on")
flags.Var(opts.NewListOptsRef(&conf.NodeGenericResources, opts.ValidateSingleGenericResource), "node-generic-resource", "Advertise user-defined resource")
flags.Var(opts.NewListOptsRef(&conf.NodeGenericResources, opts.ValidateSingleGenericResource), "node-generic-resources", "Advertise user-defined resource")
flags.IntVar(&conf.NetworkControlPlaneMTU, "network-control-plane-mtu", config.DefaultNetworkMtu, "Network Control plane MTU")

View File

@ -27,7 +27,6 @@ import (
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
@ -1004,7 +1003,7 @@ func (container *Container) CloseStreams() error {
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (container *Container) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) {
func (container *Container) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
if err := container.startLogging(); err != nil {
container.Reset(false)
return nil, err

View File

@ -102,15 +102,6 @@ func (s *State) String() string {
return fmt.Sprintf("Exited (%d) %s ago", s.ExitCodeValue, units.HumanDuration(time.Now().UTC().Sub(s.FinishedAt)))
}
// HealthString returns a single string to describe health status.
func (s *State) HealthString() string {
if s.Health == nil {
return types.NoHealthcheck
}
return s.Health.String()
}
// IsValidHealthString checks if the provided string is a valid container health status or not.
func IsValidHealthString(s string) bool {
return s == types.Starting ||

View File

@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/docker/docker/libcontainerd"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/pools"
@ -114,7 +114,7 @@ func (c *Config) CloseStreams() error {
}
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *libcontainerd.IOPipe) {
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.Add(1)
go func() {

View File

@ -295,6 +295,10 @@ func (v *memdbView) GetAllNames() map[string][]string {
// transform maps a (deep) copied Container object to what queries need.
// A lock on the Container is not held because these are immutable deep copies.
func (v *memdbView) transform(container *Container) *Snapshot {
health := types.NoHealthcheck
if container.Health != nil {
health = container.Health.Status()
}
snapshot := &Snapshot{
Container: types.Container{
ID: container.ID,
@ -313,7 +317,7 @@ func (v *memdbView) transform(container *Container) *Snapshot {
Managed: container.Managed,
ExposedPorts: make(nat.PortSet),
PortBindings: make(nat.PortSet),
Health: container.HealthString(),
Health: health,
Running: container.Running,
Paused: container.Paused,
ExitCode: container.ExitCode(),

View File

@ -6,6 +6,7 @@ import (
"path/filepath"
"testing"
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
@ -159,3 +160,26 @@ func TestNames(t *testing.T) {
view = db.Snapshot()
assert.Equal(t, map[string][]string{"containerid4": {"name1", "name2"}}, view.GetAllNames())
}
// Test case for GitHub issue 35920
func TestViewWithHealthCheck(t *testing.T) {
var (
db, _ = NewViewDB()
one = newContainer(t)
)
one.Health = &Health{
Health: types.Health{
Status: "starting",
},
}
if err := one.CheckpointTo(db); err != nil {
t.Fatal(err)
}
s, err := db.Snapshot().Get(one.ID)
if err != nil {
t.Fatal(err)
}
if s == nil || s.Health != "starting" {
t.Fatalf("expected Health=starting. Got: %+v", s)
}
}

View File

@ -71,7 +71,7 @@ func (rl *releaseableLayer) Commit(os string) (builder.ReleaseableLayer, error)
if err != nil {
return nil, err
}
// TODO: An optimization woudld be to handle empty layers before returning
// TODO: An optimization would be to handle empty layers before returning
return &releaseableLayer{layerStore: rl.layerStore, roLayer: newLayer}, nil
}

View File

@ -27,8 +27,8 @@ import (
// Backend defines the executor component for a swarm agent.
type Backend interface {
CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
DeleteManagedNetwork(name string) error
FindNetwork(idName string) (libnetwork.Network, error)
DeleteManagedNetwork(networkID string) error
FindUniqueNetwork(idName string) (libnetwork.Network, error)
SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)
ReleaseIngress() (<-chan struct{}, error)
PullImage(ctx context.Context, image, tag, platform string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error

View File

@ -143,8 +143,8 @@ func (c *containerAdapter) pullImage(ctx context.Context) error {
}
func (c *containerAdapter) createNetworks(ctx context.Context) error {
for _, network := range c.container.networks() {
ncr, err := c.container.networkCreateRequest(network)
for name := range c.container.networksAttachments {
ncr, err := c.container.networkCreateRequest(name)
if err != nil {
return err
}
@ -162,15 +162,15 @@ func (c *containerAdapter) createNetworks(ctx context.Context) error {
}
func (c *containerAdapter) removeNetworks(ctx context.Context) error {
for _, nid := range c.container.networks() {
if err := c.backend.DeleteManagedNetwork(nid); err != nil {
for name, v := range c.container.networksAttachments {
if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
switch err.(type) {
case *libnetwork.ActiveEndpointsError:
continue
case libnetwork.ErrNoSuchNetwork:
continue
default:
log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
log.G(ctx).Errorf("network %s remove failed: %v", name, err)
return err
}
}

View File

@ -507,7 +507,7 @@ func getEndpointConfig(na *api.NetworkAttachment, b executorpkg.Backend) *networ
DriverOpts: na.DriverAttachmentOpts,
}
if v, ok := na.Network.Spec.Annotations.Labels["com.docker.swarm.predefined"]; ok && v == "true" {
if ln, err := b.FindNetwork(na.Network.Spec.Annotations.Name); err == nil {
if ln, err := b.FindUniqueNetwork(na.Network.Spec.Annotations.Name); err == nil {
n.NetworkID = ln.ID()
}
}
@ -575,19 +575,6 @@ func (c *containerConfig) serviceConfig() *clustertypes.ServiceConfig {
return svcCfg
}
// networks returns a list of network names attached to the container. The
// returned name can be used to lookup the corresponding network create
// options.
func (c *containerConfig) networks() []string {
var networks []string
for name := range c.networksAttachments {
networks = append(networks, name)
}
return networks
}
func (c *containerConfig) networkCreateRequest(name string) (clustertypes.NetworkCreateRequest, error) {
na, ok := c.networksAttachments[name]
if !ok {

View File

@ -292,7 +292,7 @@ func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.Control
for i, n := range networks {
apiNetwork, err := getNetwork(ctx, client, n.Target)
if err != nil {
ln, _ := c.config.Backend.FindNetwork(n.Target)
ln, _ := c.config.Backend.FindUniqueNetwork(n.Target)
if ln != nil && runconfig.IsPreDefinedNetwork(ln.Name()) {
// Need to retrieve the corresponding predefined swarm network
// and use its id for the request.

View File

@ -251,8 +251,8 @@ func (daemon *Daemon) updateNetworkSettings(container *container.Container, n li
return runconfig.ErrConflictHostNetwork
}
for s := range container.NetworkSettings.Networks {
sn, err := daemon.FindNetwork(s)
for s, v := range container.NetworkSettings.Networks {
sn, err := daemon.FindUniqueNetwork(getNetworkID(s, v.EndpointSettings))
if err != nil {
continue
}
@ -308,8 +308,8 @@ func (daemon *Daemon) updateNetwork(container *container.Container) error {
// Find if container is connected to the default bridge network
var n libnetwork.Network
for name := range container.NetworkSettings.Networks {
sn, err := daemon.FindNetwork(name)
for name, v := range container.NetworkSettings.Networks {
sn, err := daemon.FindUniqueNetwork(getNetworkID(name, v.EndpointSettings))
if err != nil {
continue
}
@ -339,7 +339,7 @@ func (daemon *Daemon) updateNetwork(container *container.Container) error {
}
func (daemon *Daemon) findAndAttachNetwork(container *container.Container, idOrName string, epConfig *networktypes.EndpointSettings) (libnetwork.Network, *networktypes.NetworkingConfig, error) {
n, err := daemon.FindNetwork(idOrName)
n, err := daemon.FindUniqueNetwork(getNetworkID(idOrName, epConfig))
if err != nil {
// We should always be able to find the network for a
// managed container.
@ -377,16 +377,16 @@ func (daemon *Daemon) findAndAttachNetwork(container *container.Container, idOrN
// trigger attachment in the swarm cluster manager.
if daemon.clusterProvider != nil {
var err error
config, err = daemon.clusterProvider.AttachNetwork(idOrName, container.ID, addresses)
config, err = daemon.clusterProvider.AttachNetwork(getNetworkID(idOrName, epConfig), container.ID, addresses)
if err != nil {
return nil, nil, err
}
}
n, err = daemon.FindNetwork(idOrName)
n, err = daemon.FindUniqueNetwork(getNetworkID(idOrName, epConfig))
if err != nil {
if daemon.clusterProvider != nil {
if err := daemon.clusterProvider.DetachNetwork(idOrName, container.ID); err != nil {
if err := daemon.clusterProvider.DetachNetwork(getNetworkID(idOrName, epConfig), container.ID); err != nil {
logrus.Warnf("Could not rollback attachment for container %s to network %s: %v", container.ID, idOrName, err)
}
}
@ -437,7 +437,7 @@ func (daemon *Daemon) updateContainerNetworkSettings(container *container.Contai
if mode.IsUserDefined() {
var err error
n, err = daemon.FindNetwork(networkName)
n, err = daemon.FindUniqueNetwork(networkName)
if err == nil {
networkName = n.Name()
}
@ -797,7 +797,7 @@ func (daemon *Daemon) connectToNetwork(container *container.Container, idOrName
// ForceEndpointDelete deletes an endpoint from a network forcefully
func (daemon *Daemon) ForceEndpointDelete(name string, networkName string) error {
n, err := daemon.FindNetwork(networkName)
n, err := daemon.FindUniqueNetwork(networkName)
if err != nil {
return err
}
@ -949,7 +949,7 @@ func (daemon *Daemon) releaseNetwork(container *container.Container) {
var networks []libnetwork.Network
for n, epSettings := range settings {
if nw, err := daemon.FindNetwork(n); err == nil {
if nw, err := daemon.FindUniqueNetwork(getNetworkID(n, epSettings.EndpointSettings)); err == nil {
networks = append(networks, nw)
}
@ -993,7 +993,7 @@ func (daemon *Daemon) ConnectToNetwork(container *container.Container, idOrName
return errRemovalContainer(container.ID)
}
n, err := daemon.FindNetwork(idOrName)
n, err := daemon.FindUniqueNetwork(idOrName)
if err == nil && n != nil {
if err := daemon.updateNetworkConfig(container, n, endpointConfig, true); err != nil {
return err
@ -1016,7 +1016,7 @@ func (daemon *Daemon) ConnectToNetwork(container *container.Container, idOrName
// DisconnectFromNetwork disconnects container from network n.
func (daemon *Daemon) DisconnectFromNetwork(container *container.Container, networkName string, force bool) error {
n, err := daemon.FindNetwork(networkName)
n, err := daemon.FindUniqueNetwork(networkName)
container.Lock()
defer container.Unlock()
@ -1087,3 +1087,12 @@ func (daemon *Daemon) DeactivateContainerServiceBinding(containerName string) er
}
return sb.DisableService()
}
func getNetworkID(name string, endpointSettings *networktypes.EndpointSettings) string {
// We only want to prefer NetworkID for user defined networks.
// For systems like bridge, none, etc. the name is preferred (otherwise restart may cause issues)
if containertypes.NetworkMode(name).IsUserDefined() && endpointSettings != nil && endpointSettings.NetworkID != "" {
return endpointSettings.NetworkID
}
return name
}

View File

@ -170,7 +170,7 @@ func (daemon *Daemon) initializeNetworkingPaths(container *container.Container,
if nc.NetworkSettings != nil {
for n := range nc.NetworkSettings.Networks {
sn, err := daemon.FindNetwork(n)
sn, err := daemon.FindUniqueNetwork(n)
if err != nil {
continue
}

View File

@ -317,7 +317,7 @@ func TestValidateContainerIsolation(t *testing.T) {
func TestFindNetworkErrorType(t *testing.T) {
d := Daemon{}
_, err := d.FindNetwork("fakeNet")
_, err := d.FindUniqueNetwork("fakeNet")
_, ok := errors.Cause(err).(libnetwork.ErrNoSuchNetwork)
if !errdefs.IsNotFound(err) || !ok {
assert.Fail(t, "The FindNetwork method MUST always return an error that implements the NotFound interface and is ErrNoSuchNetwork")

View File

@ -226,20 +226,3 @@ func translateContainerdStartErr(cmd string, setExitCode func(int), err error) e
// TODO: it would be nice to get some better errors from containerd so we can return better errors here
return retErr
}
// TODO: cpuguy83 take care of it once the new library is ready
type errNotFound struct{ error }
func (errNotFound) NotFound() {}
func (e errNotFound) Cause() error {
return e.error
}
// notFound is a helper to create an error of the class with the same name from any error type
func notFound(err error) error {
if err == nil {
return nil
}
return errNotFound{err}
}

View File

@ -6,7 +6,6 @@ import (
"github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)
@ -63,7 +62,7 @@ func (i *rio) Wait() {
}
// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop *libcontainerd.IOPipe) (cio.IO, error) {
func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop)
if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {

View File

@ -10,6 +10,7 @@ import (
"path/filepath"
"testing"
"github.com/gotestyourself/gotestyourself/fs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
@ -22,7 +23,7 @@ const imageSize = 64 * 1024 * 1024
func TestBlockDev(t *testing.T) {
mkfs, err := exec.LookPath("mkfs.xfs")
if err != nil {
t.Fatal("mkfs.xfs not installed")
t.Skip("mkfs.xfs not found in PATH")
}
// create a sparse image
@ -52,18 +53,11 @@ func TestBlockDev(t *testing.T) {
t.Fatal(err)
}
runTest(t, "testBlockDevQuotaDisabled", wrapMountTest(imageFileName, false, testBlockDevQuotaDisabled))
runTest(t, "testBlockDevQuotaEnabled", wrapMountTest(imageFileName, true, testBlockDevQuotaEnabled))
runTest(t, "testSmallerThanQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testSmallerThanQuota)))
runTest(t, "testBiggerThanQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testBiggerThanQuota)))
runTest(t, "testRetrieveQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testRetrieveQuota)))
}
func runTest(t *testing.T, testName string, testFunc func(*testing.T)) {
if success := t.Run(testName, testFunc); !success {
out, _ := exec.Command("dmesg").CombinedOutput()
t.Log(string(out))
}
t.Run("testBlockDevQuotaDisabled", wrapMountTest(imageFileName, false, testBlockDevQuotaDisabled))
t.Run("testBlockDevQuotaEnabled", wrapMountTest(imageFileName, true, testBlockDevQuotaEnabled))
t.Run("testSmallerThanQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testSmallerThanQuota)))
t.Run("testBiggerThanQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testBiggerThanQuota)))
t.Run("testRetrieveQuota", wrapMountTest(imageFileName, true, wrapQuotaTest(testRetrieveQuota)))
}
func wrapMountTest(imageFileName string, enableQuota bool, testFunc func(t *testing.T, mountPoint, backingFsDev string)) func(*testing.T) {
@ -74,25 +68,22 @@ func wrapMountTest(imageFileName string, enableQuota bool, testFunc func(t *test
mountOptions = mountOptions + ",prjquota"
}
// create a mountPoint
mountPoint, err := ioutil.TempDir("", "xfs-mountPoint")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(mountPoint)
mountPointDir := fs.NewDir(t, "xfs-mountPoint")
defer mountPointDir.Remove()
mountPoint := mountPointDir.Path()
out, err := exec.Command("mount", "-o", mountOptions, imageFileName, mountPoint).CombinedOutput()
if len(out) > 0 {
t.Log(string(out))
}
if err != nil {
t.Fatal("mount failed")
_, err := os.Stat("/proc/fs/xfs")
if os.IsNotExist(err) {
t.Skip("no /proc/fs/xfs")
}
}
require.NoError(t, err, "mount failed: %s", out)
defer func() {
if err := unix.Unmount(mountPoint, 0); err != nil {
t.Fatal(err)
}
require.NoError(t, unix.Unmount(mountPoint, 0))
}()
backingFsDev, err := makeBackingFsDev(mountPoint)

View File

@ -29,36 +29,36 @@ func (daemon *Daemon) NetworkControllerEnabled() bool {
return daemon.netController != nil
}
// FindNetwork function finds a network for a given string that can represent network name or id
func (daemon *Daemon) FindNetwork(idName string) (libnetwork.Network, error) {
// 1. match by full ID.
n, err := daemon.GetNetworkByID(idName)
if err == nil || !isNoSuchNetworkError(err) {
return n, err
// FindUniqueNetwork returns a network based on:
// 1. Full ID
// 2. Full Name
// 3. Partial ID
// as long as there is no ambiguity
func (daemon *Daemon) FindUniqueNetwork(term string) (libnetwork.Network, error) {
listByFullName := []libnetwork.Network{}
listByPartialID := []libnetwork.Network{}
for _, nw := range daemon.GetNetworks() {
if nw.ID() == term {
return nw, nil
}
if nw.Name() == term {
listByFullName = append(listByFullName, nw)
}
if strings.HasPrefix(nw.ID(), term) {
listByPartialID = append(listByPartialID, nw)
}
}
// 2. match by full name
n, err = daemon.GetNetworkByName(idName)
if err == nil || !isNoSuchNetworkError(err) {
return n, err
switch {
case len(listByFullName) == 1:
return listByFullName[0], nil
case len(listByFullName) > 1:
return nil, fmt.Errorf("network %s is ambiguous (%d matches found based on name)", term, len(listByFullName))
case len(listByPartialID) == 1:
return listByPartialID[0], nil
case len(listByPartialID) > 1:
return nil, fmt.Errorf("network %s is ambiguous (%d matches found based on ID prefix)", term, len(listByPartialID))
}
// 3. match by ID prefix
list := daemon.GetNetworksByIDPrefix(idName)
if len(list) == 0 {
// Be very careful to change the error type here, the libnetwork.ErrNoSuchNetwork error is used by the controller
// to retry the creation of the network as managed through the swarm manager
return nil, errors.WithStack(notFound(libnetwork.ErrNoSuchNetwork(idName)))
}
if len(list) > 1 {
return nil, errors.WithStack(invalidIdentifier(idName))
}
return list[0], nil
}
func isNoSuchNetworkError(err error) bool {
_, ok := err.(libnetwork.ErrNoSuchNetwork)
return ok
return nil, libnetwork.ErrNoSuchNetwork(term)
}
// GetNetworkByID function returns a network whose ID matches the given ID.
@ -104,7 +104,11 @@ func (daemon *Daemon) GetNetworksByIDPrefix(partialID string) []libnetwork.Netwo
// getAllNetworks returns a list containing all networks
func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
return daemon.netController.Networks()
c := daemon.netController
if c == nil {
return nil
}
return c.Networks()
}
type ingressJob struct {
@ -274,7 +278,9 @@ func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string
// check if user defined CheckDuplicate, if set true, return err
// otherwise prepare a warning message
if create.CheckDuplicate {
return nil, libnetwork.NetworkNameError(create.Name)
if !agent || nw.Info().Dynamic() {
return nil, libnetwork.NetworkNameError(create.Name)
}
}
warning = fmt.Sprintf("Network with name %s (id : %s) already exists", nw.Name(), nw.ID())
}
@ -464,25 +470,56 @@ func (daemon *Daemon) GetNetworkDriverList() []string {
}
// DeleteManagedNetwork deletes an agent network.
// The requirement of networkID is enforced.
func (daemon *Daemon) DeleteManagedNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, true)
n, err := daemon.GetNetworkByID(networkID)
if err != nil {
return err
}
return daemon.deleteNetwork(n, true)
}
// DeleteNetwork destroys a network unless it's one of docker's predefined networks.
func (daemon *Daemon) DeleteNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, false)
}
func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
nw, err := daemon.FindNetwork(networkID)
n, err := daemon.GetNetworkByID(networkID)
if err != nil {
return err
}
return daemon.deleteNetwork(n, false)
}
if nw.Info().Ingress() {
return nil
func (daemon *Daemon) deleteLoadBalancerSandbox(n libnetwork.Network) {
controller := daemon.netController
//The only endpoint left should be the LB endpoint (nw.Name() + "-endpoint")
endpoints := n.Endpoints()
if len(endpoints) == 1 {
sandboxName := n.Name() + "-sbox"
info := endpoints[0].Info()
if info != nil {
sb := info.Sandbox()
if sb != nil {
if err := sb.DisableService(); err != nil {
logrus.Warnf("Failed to disable service on sandbox %s: %v", sandboxName, err)
//Ignore error and attempt to delete the load balancer endpoint
}
}
}
if err := endpoints[0].Delete(true); err != nil {
logrus.Warnf("Failed to delete endpoint %s (%s) in %s: %v", endpoints[0].Name(), endpoints[0].ID(), sandboxName, err)
//Ignore error and attempt to delete the sandbox.
}
if err := controller.SandboxDestroy(sandboxName); err != nil {
logrus.Warnf("Failed to delete %s sandbox: %v", sandboxName, err)
//Ignore error and attempt to delete the network.
}
}
}
func (daemon *Daemon) deleteNetwork(nw libnetwork.Network, dynamic bool) error {
if runconfig.IsPreDefinedNetwork(nw.Name()) && !dynamic {
err := fmt.Errorf("%s is a pre-defined network and cannot be removed", nw.Name())
return notAllowedError{err}

View File

@ -159,7 +159,7 @@ func (daemon *Daemon) createSpec(c *container.Container) (*specs.Spec, error) {
gwHNSID := ""
if c.NetworkSettings != nil {
for n := range c.NetworkSettings.Networks {
sn, err := daemon.FindNetwork(n)
sn, err := daemon.FindUniqueNetwork(n)
if err != nil {
continue
}

View File

@ -4,6 +4,10 @@ TOMLV_COMMIT=9baf8a8a9f2ed20a8e54160840c492f937eeaf9a
# When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
RUNC_COMMIT=b2567b37d7b75eb4cf325b77297b140ea686ce8f
# containerd is also pinned in vendor.conf. When updating the binary
# version you may also need to update the vendor version to pick up bug
# fixes or new APIs.
CONTAINERD_COMMIT=89623f28b87a6004d4b785663257362d1658a729 # v1.0.0
TINI_COMMIT=949e6facb77383876aeff8a6944dde66b3089574
LIBNETWORK_COMMIT=7b2b1feb1de4817d522cc372af149ff48d25028e

View File

@ -11,6 +11,7 @@ import (
"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/request"
"github.com/gotestyourself/gotestyourself/poll"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
@ -80,6 +81,68 @@ func TestCreateServiceMultipleTimes(t *testing.T) {
poll.WaitOn(t, networkIsRemoved(client, overlayID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
}
func TestCreateWithDuplicateNetworkNames(t *testing.T) {
defer setupTest(t)()
d := newSwarm(t)
defer d.Stop(t)
client, err := request.NewClientForHost(d.Sock())
require.NoError(t, err)
name := "foo"
networkCreate := types.NetworkCreate{
CheckDuplicate: false,
Driver: "bridge",
}
n1, err := client.NetworkCreate(context.Background(), name, networkCreate)
require.NoError(t, err)
n2, err := client.NetworkCreate(context.Background(), name, networkCreate)
require.NoError(t, err)
// Dupliates with name but with different driver
networkCreate.Driver = "overlay"
n3, err := client.NetworkCreate(context.Background(), name, networkCreate)
require.NoError(t, err)
// Create Service with the same name
var instances uint64 = 1
serviceSpec := swarmServiceSpec("top", instances)
serviceSpec.TaskTemplate.Networks = append(serviceSpec.TaskTemplate.Networks, swarm.NetworkAttachmentConfig{Target: name})
service, err := client.ServiceCreate(context.Background(), serviceSpec, types.ServiceCreateOptions{})
require.NoError(t, err)
poll.WaitOn(t, serviceRunningTasksCount(client, service.ID, instances))
resp, _, err := client.ServiceInspectWithRaw(context.Background(), service.ID, types.ServiceInspectOptions{})
require.NoError(t, err)
assert.Equal(t, n3.ID, resp.Spec.TaskTemplate.Networks[0].Target)
// Remove Service
err = client.ServiceRemove(context.Background(), service.ID)
require.NoError(t, err)
// Make sure task has been destroyed.
poll.WaitOn(t, serviceIsRemoved(client, service.ID))
// Remove networks
err = client.NetworkRemove(context.Background(), n3.ID)
require.NoError(t, err)
err = client.NetworkRemove(context.Background(), n2.ID)
require.NoError(t, err)
err = client.NetworkRemove(context.Background(), n1.ID)
require.NoError(t, err)
// Make sure networks have been destroyed.
poll.WaitOn(t, networkIsRemoved(client, n3.ID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
poll.WaitOn(t, networkIsRemoved(client, n2.ID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
poll.WaitOn(t, networkIsRemoved(client, n1.ID), poll.WithTimeout(1*time.Minute), poll.WithDelay(10*time.Second))
}
func swarmServiceSpec(name string, replicas uint64) swarm.ServiceSpec {
return swarm.ServiceSpec{
Annotations: swarm.Annotations{

View File

@ -121,8 +121,12 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
c.Lock()
defer c.Unlock()
var rio cio.IO
var dio *cio.DirectIO
defer func() {
if err != nil && dio != nil {
dio.Cancel()
dio.Close()
}
err = wrapError(err)
}()
@ -131,22 +135,16 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
return false, -1, errors.WithStack(err)
}
defer func() {
if err != nil && rio != nil {
rio.Cancel()
rio.Close()
}
}()
t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) {
io, err := newIOPipe(fifos)
attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
// dio must be assigned to the previously defined dio for the defer above
// to handle cleanup
dio, err = cio.NewDirectIO(ctx, fifos)
if err != nil {
return nil, err
}
rio, err = attachStdio(io)
return rio, err
})
return attachStdio(dio)
}
t, err := ctr.Task(ctx, attachIO)
if err != nil && !errdefs.IsNotFound(errors.Cause(err)) {
return false, -1, err
}
@ -255,7 +253,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
uid, gid := getSpecUser(spec)
t, err = ctr.ctr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(ctr.bundleDir, id, InitProcessName, withStdin, spec.Process.Terminal)
fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
return rio, err
},
@ -315,7 +313,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
stdinCloseSync = make(chan struct{})
)
fifos := newFIFOSet(ctr.bundleDir, containerID, processID, withStdin, spec.Terminal)
fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
defer func() {
if err != nil {
@ -323,7 +321,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
rio.Cancel()
rio.Close()
}
rmFIFOSet(fifos)
}
}()
@ -333,10 +330,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
})
if err != nil {
close(stdinCloseSync)
if rio != nil {
rio.Cancel()
rio.Close()
}
return -1, err
}
@ -612,7 +605,7 @@ func (c *client) getProcess(containerID, processID string) (containerd.Process,
// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
io, err := newIOPipe(fifos)
io, err := cio.NewDirectIO(context.Background(), fifos)
if err != nil {
return nil, err
}
@ -687,7 +680,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
"container": ei.ContainerID,
}).Error("failed to find container")
} else {
rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ContainerID, ei.ProcessID, true, false))
newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
}
}
})
@ -851,11 +844,9 @@ func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.R
}
func wrapError(err error) error {
if err == nil {
return nil
}
switch {
case err == nil:
return nil
case errdefs.IsNotFound(err):
return wrapNotFoundError(err)
}

View File

@ -80,29 +80,29 @@ func prepareBundleDir(bundleDir string, ociSpec *specs.Spec) (string, error) {
return p, nil
}
func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
fifos := &cio.FIFOSet{
func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
config := cio.Config{
Terminal: withTerminal,
Out: filepath.Join(bundleDir, processID+"-stdout"),
Stdout: filepath.Join(bundleDir, processID+"-stdout"),
}
paths := []string{config.Stdout}
if withStdin {
fifos.In = filepath.Join(bundleDir, processID+"-stdin")
config.Stdin = filepath.Join(bundleDir, processID+"-stdin")
paths = append(paths, config.Stdin)
}
if !fifos.Terminal {
fifos.Err = filepath.Join(bundleDir, processID+"-stderr")
if !withTerminal {
config.Stderr = filepath.Join(bundleDir, processID+"-stderr")
paths = append(paths, config.Stderr)
}
return fifos
}
func rmFIFOSet(fset *cio.FIFOSet) {
for _, fn := range []string{fset.Out, fset.In, fset.Err} {
if fn != "" {
if err := os.RemoveAll(fn); err != nil {
logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", fn, err)
closer := func() error {
for _, path := range paths {
if err := os.RemoveAll(path); err != nil {
logrus.Warnf("libcontainerd: failed to remove fifo %v: %v", path, err)
}
}
return nil
}
return cio.NewFIFOSet(config, closer)
}

View File

@ -2,6 +2,7 @@ package libcontainerd
import (
"fmt"
"path/filepath"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/windows/hcsshimtypes"
@ -35,19 +36,20 @@ func pipeName(containerID, processID, name string) string {
return fmt.Sprintf(`\\.\pipe\containerd-%s-%s-%s`, containerID, processID, name)
}
func newFIFOSet(bundleDir, containerID, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
fifos := &cio.FIFOSet{
func newFIFOSet(bundleDir, processID string, withStdin, withTerminal bool) *cio.FIFOSet {
containerID := filepath.Base(bundleDir)
config := cio.Config{
Terminal: withTerminal,
Out: pipeName(containerID, processID, "stdout"),
Stdout: pipeName(containerID, processID, "stdout"),
}
if withStdin {
fifos.In = pipeName(containerID, processID, "stdin")
config.Stdin = pipeName(containerID, processID, "stdin")
}
if !fifos.Terminal {
fifos.Err = pipeName(containerID, processID, "stderr")
if !config.Terminal {
config.Stderr = pipeName(containerID, processID, "stderr")
}
return fifos
return cio.NewFIFOSet(config, nil)
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
@ -18,6 +17,7 @@ import (
"github.com/Microsoft/hcsshim"
opengcs "github.com/Microsoft/opengcs/client"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/docker/docker/pkg/sysinfo"
"github.com/docker/docker/pkg/system"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -670,28 +670,12 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
return p.pid, nil
}
var (
stdout, stderr io.ReadCloser
stdin io.WriteCloser
)
stdin, stdout, stderr, err = newProcess.Stdio()
dio, err := newIOFromProcess(newProcess, ctr.ociSpec.Process.Terminal)
if err != nil {
logger.WithError(err).Error("failed to get stdio pipes")
return -1, err
}
iopipe := &IOPipe{Terminal: ctr.ociSpec.Process.Terminal}
iopipe.Stdin = createStdInCloser(stdin, newProcess)
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
_, err = attachStdio(iopipe)
_, err = attachStdio(dio)
if err != nil {
logger.WithError(err).Error("failed to attache stdio")
return -1, err
@ -727,6 +711,24 @@ func (c *client) Start(_ context.Context, id, _ string, withStdin bool, attachSt
return p.pid, nil
}
func newIOFromProcess(newProcess hcsshim.Process, terminal bool) (*cio.DirectIO, error) {
stdin, stdout, stderr, err := newProcess.Stdio()
if err != nil {
return nil, err
}
dio := cio.NewDirectIO(createStdInCloser(stdin, newProcess), nil, nil, terminal)
// Convert io.ReadClosers to io.Readers
if stdout != nil {
dio.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
dio.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
return dio, nil
}
// Exec adds a process in an running container
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
ctr := c.getContainer(containerID)
@ -781,10 +783,6 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
logger.Debugf("exec commandLine: %s", createProcessParms.CommandLine)
// Start the command running in the container.
var (
stdout, stderr io.ReadCloser
stdin io.WriteCloser
)
newProcess, err := ctr.hcsContainer.CreateProcess(&createProcessParms)
if err != nil {
logger.WithError(err).Errorf("exec's CreateProcess() failed")
@ -807,25 +805,13 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
}
}()
stdin, stdout, stderr, err = newProcess.Stdio()
dio, err := newIOFromProcess(newProcess, spec.Terminal)
if err != nil {
logger.WithError(err).Error("getting std pipes failed")
logger.WithError(err).Error("failed to get stdio pipes")
return -1, err
}
iopipe := &IOPipe{Terminal: spec.Terminal}
iopipe.Stdin = createStdInCloser(stdin, newProcess)
// Convert io.ReadClosers to io.Readers
if stdout != nil {
iopipe.Stdout = ioutil.NopCloser(&autoClosingReader{ReadCloser: stdout})
}
if stderr != nil {
iopipe.Stderr = ioutil.NopCloser(&autoClosingReader{ReadCloser: stderr})
}
// Tell the engine to attach streams back to the client
_, err = attachStdio(iopipe)
_, err = attachStdio(dio)
if err != nil {
return -1, err
}

View File

@ -1,36 +0,0 @@
package libcontainerd
import "github.com/containerd/containerd/cio"
// Config returns the containerd.IOConfig of this pipe set
func (p *IOPipe) Config() cio.Config {
return p.config
}
// Cancel aborts ongoing operations if they have not completed yet
func (p *IOPipe) Cancel() {
p.cancel()
}
// Wait waits for io operations to finish
func (p *IOPipe) Wait() {
}
// Close closes the underlying pipes
func (p *IOPipe) Close() error {
p.cancel()
if p.Stdin != nil {
p.Stdin.Close()
}
if p.Stdout != nil {
p.Stdout.Close()
}
if p.Stderr != nil {
p.Stderr.Close()
}
return nil
}

View File

@ -1,60 +0,0 @@
// +build !windows
package libcontainerd
import (
"context"
"io"
"syscall"
"github.com/containerd/containerd/cio"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) {
var (
err error
ctx, cancel = context.WithCancel(context.Background())
f io.ReadWriteCloser
iop = &IOPipe{
Terminal: fifos.Terminal,
cancel: cancel,
config: cio.Config{
Terminal: fifos.Terminal,
Stdin: fifos.In,
Stdout: fifos.Out,
Stderr: fifos.Err,
},
}
)
defer func() {
if err != nil {
cancel()
iop.Close()
}
}()
if fifos.In != "" {
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stdin = f
}
if fifos.Out != "" {
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stdout = f
}
if fifos.Err != "" {
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, errors.WithStack(err)
}
iop.Stderr = f
}
return iop, nil
}

View File

@ -1,138 +0,0 @@
package libcontainerd
import (
"context"
"io"
"net"
"sync"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/cio"
"github.com/pkg/errors"
)
type winpipe struct {
sync.Mutex
ctx context.Context
listener net.Listener
readyCh chan struct{}
readyErr error
client net.Conn
}
func newWinpipe(ctx context.Context, pipe string) (*winpipe, error) {
l, err := winio.ListenPipe(pipe, nil)
if err != nil {
return nil, errors.Wrapf(err, "%q pipe creation failed", pipe)
}
wp := &winpipe{
ctx: ctx,
listener: l,
readyCh: make(chan struct{}),
}
go func() {
go func() {
defer close(wp.readyCh)
defer wp.listener.Close()
c, err := wp.listener.Accept()
if err != nil {
wp.Lock()
if wp.readyErr == nil {
wp.readyErr = err
}
wp.Unlock()
return
}
wp.client = c
}()
select {
case <-wp.readyCh:
case <-ctx.Done():
wp.Lock()
if wp.readyErr == nil {
wp.listener.Close()
wp.readyErr = ctx.Err()
}
wp.Unlock()
}
}()
return wp, nil
}
func (wp *winpipe) Read(b []byte) (int, error) {
select {
case <-wp.ctx.Done():
return 0, wp.ctx.Err()
case <-wp.readyCh:
return wp.client.Read(b)
}
}
func (wp *winpipe) Write(b []byte) (int, error) {
select {
case <-wp.ctx.Done():
return 0, wp.ctx.Err()
case <-wp.readyCh:
return wp.client.Write(b)
}
}
func (wp *winpipe) Close() error {
select {
case <-wp.readyCh:
return wp.client.Close()
default:
return nil
}
}
func newIOPipe(fifos *cio.FIFOSet) (*IOPipe, error) {
var (
err error
ctx, cancel = context.WithCancel(context.Background())
p io.ReadWriteCloser
iop = &IOPipe{
Terminal: fifos.Terminal,
cancel: cancel,
config: cio.Config{
Terminal: fifos.Terminal,
Stdin: fifos.In,
Stdout: fifos.Out,
Stderr: fifos.Err,
},
}
)
defer func() {
if err != nil {
cancel()
iop.Close()
}
}()
if fifos.In != "" {
if p, err = newWinpipe(ctx, fifos.In); err != nil {
return nil, err
}
iop.Stdin = p
}
if fifos.Out != "" {
if p, err = newWinpipe(ctx, fifos.Out); err != nil {
return nil, err
}
iop.Stdout = p
}
if fifos.Err != "" {
if p, err = newWinpipe(ctx, fifos.Err); err != nil {
return nil, err
}
iop.Stderr = p
}
return iop, nil
}

View File

@ -1,56 +0,0 @@
// +build !windows
package libcontainerd
import "github.com/pkg/errors"
// process represents the state for the main container process or an exec.
type process struct {
// id is the logical name of the process
id string
// cid is the container id to which this process belongs
cid string
// pid is the identifier of the process
pid uint32
// io holds the io reader/writer associated with the process
io *IOPipe
// root is the state directory for the process
root string
}
func (p *process) ID() string {
return p.id
}
func (p *process) Pid() uint32 {
return p.pid
}
func (p *process) SetPid(pid uint32) error {
if p.pid != 0 {
return errors.Errorf("pid is already set to %d", pid)
}
p.pid = pid
return nil
}
func (p *process) IOPipe() *IOPipe {
return p.io
}
func (p *process) CloseIO() {
if p.io.Stdin != nil {
p.io.Stdin.Close()
}
if p.io.Stdout != nil {
p.io.Stdout.Close()
}
if p.io.Stderr != nil {
p.io.Stderr.Close()
}
}

View File

@ -1,59 +0,0 @@
package libcontainerd
import (
"os"
"path/filepath"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
var fdNames = map[int]string{
unix.Stdin: "stdin",
unix.Stdout: "stdout",
unix.Stderr: "stderr",
}
func (p *process) pipeName(index int) string {
return filepath.Join(p.root, p.id+"-"+fdNames[index])
}
func (p *process) IOPaths() (string, string, string) {
var (
stdin = p.pipeName(unix.Stdin)
stdout = p.pipeName(unix.Stdout)
stderr = p.pipeName(unix.Stderr)
)
// TODO: debug why we're having zombies when I don't unset those
if p.io.Stdin == nil {
stdin = ""
}
if p.io.Stderr == nil {
stderr = ""
}
return stdin, stdout, stderr
}
func (p *process) Cleanup() error {
var retErr error
// Ensure everything was closed
p.CloseIO()
for _, i := range [3]string{
p.pipeName(unix.Stdin),
p.pipeName(unix.Stdout),
p.pipeName(unix.Stderr),
} {
err := os.Remove(i)
if err != nil {
if retErr == nil {
retErr = errors.Wrapf(err, "failed to remove %s", i)
} else {
retErr = errors.Wrapf(retErr, "failed to remove %s", i)
}
}
}
return retErr
}

View File

@ -2,7 +2,6 @@ package libcontainerd
import (
"context"
"io"
"time"
"github.com/containerd/containerd"
@ -107,20 +106,4 @@ type Client interface {
}
// StdioCallback is called to connect a container or process stdio.
type StdioCallback func(*IOPipe) (cio.IO, error)
// IOPipe contains the stdio streams.
type IOPipe struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
Terminal bool // Whether stderr is connected on Windows
cancel context.CancelFunc
config cio.Config
}
// ServerVersion contains version information as retrieved from the
// server
type ServerVersion struct {
}
type StdioCallback func(io *cio.DirectIO) (cio.IO, error)

View File

@ -122,7 +122,7 @@ func (c *rio) Wait() {
}
func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerd.StdioCallback {
return func(iop *libcontainerd.IOPipe) (cio.IO, error) {
return func(iop *cio.DirectIO) (cio.IO, error) {
if iop.Stdin != nil {
iop.Stdin.Close()
// closing stdin shouldn't be needed here, it should never be open

View File

@ -1,7 +1,7 @@
# the following lines are in sorted order, FYI
github.com/Azure/go-ansiterm d6e3b3328b783f23731bc4d058875b0371ff8109
github.com/Microsoft/hcsshim v0.6.8
github.com/Microsoft/go-winio v0.4.5
github.com/Microsoft/go-winio v0.4.6
github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76
github.com/docker/libtrust 9cbd2a1374f46905c68a4eb3694a130610adc62a
github.com/go-check/check 4ed411733c5785b40214c70bce814c3a3a689609 https://github.com/cpuguy83/check.git
@ -66,8 +66,8 @@ google.golang.org/grpc v1.3.0
# When updating, also update RUNC_COMMIT in hack/dockerfile/binaries-commits accordingly
github.com/opencontainers/runc b2567b37d7b75eb4cf325b77297b140ea686ce8f
github.com/opencontainers/runtime-spec v1.0.0
github.com/opencontainers/image-spec v1.0.0
github.com/opencontainers/runtime-spec v1.0.1
github.com/opencontainers/image-spec v1.0.1
github.com/seccomp/libseccomp-golang 32f571b70023028bd57d9288c20efbcb237f3ce0
# libcontainer deps (see src/github.com/opencontainers/runc/Godeps/Godeps.json)
@ -103,7 +103,7 @@ github.com/googleapis/gax-go da06d194a00e19ce00d9011a13931c3f6f6887c7
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
# containerd
github.com/containerd/containerd 89623f28b87a6004d4b785663257362d1658a729 # v1.0.0
github.com/containerd/containerd 3fa104f843ec92328912e042b767d26825f202aa
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/continuity 35d55c5e8dd23b32037d56cf97174aff3efdfa83
github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f

View File

@ -22,6 +22,7 @@ import (
const (
cERROR_PIPE_BUSY = syscall.Errno(231)
cERROR_NO_DATA = syscall.Errno(232)
cERROR_PIPE_CONNECTED = syscall.Errno(535)
cERROR_SEM_TIMEOUT = syscall.Errno(121)
@ -254,6 +255,36 @@ func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
return f, nil
}
func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
p, err := l.makeServerPipe()
if err != nil {
return nil, err
}
// Wait for the client to connect.
ch := make(chan error)
go func(p *win32File) {
ch <- connectPipe(p)
}(p)
select {
case err = <-ch:
if err != nil {
p.Close()
p = nil
}
case <-l.closeCh:
// Abort the connect request by closing the handle.
p.Close()
p = nil
err = <-ch
if err == nil || err == ErrFileClosed {
err = ErrPipeListenerClosed
}
}
return p, err
}
func (l *win32PipeListener) listenerRoutine() {
closed := false
for !closed {
@ -261,31 +292,20 @@ func (l *win32PipeListener) listenerRoutine() {
case <-l.closeCh:
closed = true
case responseCh := <-l.acceptCh:
p, err := l.makeServerPipe()
if err == nil {
// Wait for the client to connect.
ch := make(chan error)
go func(p *win32File) {
ch <- connectPipe(p)
}(p)
select {
case err = <-ch:
if err != nil {
p.Close()
p = nil
}
case <-l.closeCh:
// Abort the connect request by closing the handle.
p.Close()
p = nil
err = <-ch
if err == nil || err == ErrFileClosed {
err = ErrPipeListenerClosed
}
closed = true
var (
p *win32File
err error
)
for {
p, err = l.makeConnectedServerPipe()
// If the connection was immediately closed by the client, try
// again.
if err != cERROR_NO_DATA {
break
}
}
responseCh <- acceptResponse{p, err}
closed = err == ErrPipeListenerClosed
}
}
syscall.Close(l.firstHandle)

View File

@ -381,8 +381,10 @@ func (cw *changeWriter) HandleChange(k fs.ChangeKind, p string, f os.FileInfo, e
additionalLinks = cw.inodeRefs[inode]
delete(cw.inodeRefs, inode)
}
} else if k == fs.ChangeKindUnmodified {
} else if k == fs.ChangeKindUnmodified && !f.IsDir() {
// Nothing to write to diff
// Unmodified directories should still be written to keep
// directory permissions correct on direct unpack
return nil
}

View File

@ -8,7 +8,7 @@ import (
"sync"
)
// Config holds the io configurations.
// Config holds the IO configurations.
type Config struct {
// Terminal is true if one has been allocated
Terminal bool
@ -24,48 +24,17 @@ type Config struct {
type IO interface {
// Config returns the IO configuration.
Config() Config
// Cancel aborts all current io operations
// Cancel aborts all current io operations.
Cancel()
// Wait blocks until all io copy operations have completed
// Wait blocks until all io copy operations have completed.
Wait()
// Close cleans up all open io resources
// Close cleans up all open io resources. Cancel() is always called before
// Close()
Close() error
}
// cio is a basic container IO implementation.
type cio struct {
config Config
closer *wgCloser
}
func (c *cio) Config() Config {
return c.config
}
func (c *cio) Cancel() {
if c.closer == nil {
return
}
c.closer.Cancel()
}
func (c *cio) Wait() {
if c.closer == nil {
return
}
c.closer.Wait()
}
func (c *cio) Close() error {
if c.closer == nil {
return nil
}
return c.closer.Close()
}
// Creation creates new IO sets for a task
type Creation func(id string) (IO, error)
// Creator creates new IO sets for a task
type Creator func(id string) (IO, error)
// Attach allows callers to reattach to running tasks
//
@ -74,123 +43,138 @@ type Creation func(id string) (IO, error)
// will be sent only to the first reads
type Attach func(*FIFOSet) (IO, error)
// NewIO returns an Creation that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creation {
return NewIOWithTerminal(stdin, stdout, stderr, false)
}
// NewIOWithTerminal creates a new io set with the provied io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creation {
return func(id string) (_ IO, err error) {
paths, err := NewFifos(id)
if err != nil {
return nil, err
}
defer func() {
if err != nil && paths.Dir != "" {
os.RemoveAll(paths.Dir)
}
}()
cfg := Config{
Terminal: terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg}
set := &ioSet{
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil {
return nil, err
}
i.closer = closer
return i, nil
}
}
// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
return func(paths *FIFOSet) (IO, error) {
if paths == nil {
return nil, fmt.Errorf("cannot attach to existing fifos")
}
cfg := Config{
Terminal: paths.Terminal,
Stdout: paths.Out,
Stderr: paths.Err,
Stdin: paths.In,
}
i := &cio{config: cfg}
set := &ioSet{
in: stdin,
out: stdout,
err: stderr,
}
closer, err := copyIO(paths, set, cfg.Terminal)
if err != nil {
return nil, err
}
i.closer = closer
return i, nil
}
}
// Stdio returns an IO set to be used for a task
// that outputs the container's IO as the current processes Stdio
func Stdio(id string) (IO, error) {
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
}
// StdioTerminal will setup the IO for the task to use a terminal
func StdioTerminal(id string) (IO, error) {
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
}
// NullIO redirects the container's IO into /dev/null
func NullIO(id string) (IO, error) {
return &cio{}, nil
}
// FIFOSet is a set of fifos for use with tasks
// FIFOSet is a set of file paths to FIFOs for a task's standard IO streams
type FIFOSet struct {
// Dir is the directory holding the task fifos
Dir string
// In, Out, and Err fifo paths
In, Out, Err string
// Terminal returns true if a terminal is being used for the task
Terminal bool
Config
close func() error
}
type ioSet struct {
in io.Reader
out, err io.Writer
}
type wgCloser struct {
wg *sync.WaitGroup
dir string
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() error {
for _, f := range g.set {
f.Close()
}
if g.dir != "" {
return os.RemoveAll(g.dir)
// Close the FIFOSet
func (f *FIFOSet) Close() error {
if f.close != nil {
return f.close()
}
return nil
}
func (g *wgCloser) Cancel() {
g.cancel()
// NewFIFOSet returns a new FIFOSet from a Config and a close function
func NewFIFOSet(config Config, close func() error) *FIFOSet {
return &FIFOSet{Config: config, close: close}
}
// Streams used to configure a Creator or Attach
type Streams struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Terminal bool
}
// Opt customize options for creating a Creator or Attach
type Opt func(*Streams)
// WithStdio sets stream options to the standard input/output streams
func WithStdio(opt *Streams) {
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
}
// WithTerminal sets the terminal option
func WithTerminal(opt *Streams) {
opt.Terminal = true
}
// WithStreams sets the stream options to the specified Reader and Writers
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
return func(opt *Streams) {
opt.Stdin = stdin
opt.Stdout = stdout
opt.Stderr = stderr
}
}
// NewCreator returns an IO creator from the options
func NewCreator(opts ...Opt) Creator {
streams := &Streams{}
for _, opt := range opts {
opt(streams)
}
return func(id string) (IO, error) {
// TODO: accept root as a param
root := "/run/containerd/fifo"
fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
if err != nil {
return nil, err
}
return copyIO(fifos, streams)
}
}
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
func NewAttach(opts ...Opt) Attach {
streams := &Streams{}
for _, opt := range opts {
opt(streams)
}
return func(fifos *FIFOSet) (IO, error) {
if fifos == nil {
return nil, fmt.Errorf("cannot attach, missing fifos")
}
return copyIO(fifos, streams)
}
}
// NullIO redirects the container's IO into /dev/null
func NullIO(_ string) (IO, error) {
return &cio{}, nil
}
// cio is a basic container IO implementation.
type cio struct {
config Config
wg *sync.WaitGroup
closers []io.Closer
cancel context.CancelFunc
}
func (c *cio) Config() Config {
return c.config
}
func (c *cio) Wait() {
if c.wg != nil {
c.wg.Wait()
}
}
func (c *cio) Close() error {
var lastErr error
for _, closer := range c.closers {
if closer == nil {
continue
}
if err := closer.Close(); err != nil {
lastErr = err
}
}
return lastErr
}
func (c *cio) Cancel() {
if c.cancel != nil {
c.cancel()
}
}
type pipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
pipes
cio
}
var _ IO = &DirectIO{}

View File

@ -12,173 +12,115 @@ import (
"syscall"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
// NewFifos returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) {
root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
// NewFIFOSetInDir returns a new FIFOSet with paths in a temporary directory under root
func NewFIFOSetInDir(root, id string, terminal bool) (*FIFOSet, error) {
if root != "" {
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
return nil, err
}
return &FIFOSet{
Dir: dir,
In: filepath.Join(dir, id+"-stdin"),
Out: filepath.Join(dir, id+"-stdout"),
Err: filepath.Join(dir, id+"-stderr"),
}, nil
closer := func() error {
return os.RemoveAll(dir)
}
return NewFIFOSet(Config{
Stdin: filepath.Join(dir, id+"-stdin"),
Stdout: filepath.Join(dir, id+"-stdout"),
Stderr: filepath.Join(dir, id+"-stderr"),
Terminal: terminal,
}, closer), nil
}
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
go func(w io.WriteCloser) {
io.Copy(w, ioset.in)
w.Close()
}(f)
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.out, r)
r.Close()
wg.Done()
}(f)
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
}
set = append(set, f)
if !tty {
wg.Add(1)
go func(r io.ReadCloser) {
io.Copy(ioset.err, r)
r.Close()
wg.Done()
}(f)
}
return &wgCloser{
wg: wg,
dir: fifos.Dir,
set: set,
cancel: cancel,
}, nil
}
// NewDirectIO returns an IO implementation that exposes the pipes directly
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
set, err := NewFifos("")
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
var ctx, cancel = context.WithCancel(context.Background())
pipes, err := openFifos(ctx, fifos)
if err != nil {
cancel()
return nil, err
}
f := &DirectIO{
set: set,
terminal: terminal,
if fifos.Stdin != "" {
go func() {
io.Copy(pipes.Stdin, ioset.Stdin)
pipes.Stdin.Close()
}()
}
var wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
io.Copy(ioset.Stdout, pipes.Stdout)
pipes.Stdout.Close()
wg.Done()
}()
if !fifos.Terminal {
wg.Add(1)
go func() {
io.Copy(ioset.Stderr, pipes.Stderr)
pipes.Stderr.Close()
wg.Done()
}()
}
return &cio{
config: fifos.Config,
wg: wg,
closers: append(pipes.closers(), fifos),
cancel: cancel,
}, nil
}
func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) {
var err error
defer func() {
if err != nil {
f.Delete()
fifos.Close()
}
}()
if f.Stdin, err = fifo.OpenFifo(ctx, set.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, err
var f pipes
if fifos.Stdin != "" {
if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return f, errors.Wrapf(err, "failed to open stdin fifo")
}
}
if f.Stdout, err = fifo.OpenFifo(ctx, set.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return nil, err
if fifos.Stdout != "" {
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return f, errors.Wrapf(err, "failed to open stdout fifo")
}
}
if f.Stderr, err = fifo.OpenFifo(ctx, set.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return nil, err
if fifos.Stderr != "" {
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return f, errors.Wrapf(err, "failed to open stderr fifo")
}
}
return f, nil
}
// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
set *FIFOSet
terminal bool
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
// and io.WriteCloser.
func NewDirectIO(ctx context.Context, fifos *FIFOSet) (*DirectIO, error) {
ctx, cancel := context.WithCancel(ctx)
pipes, err := openFifos(ctx, fifos)
return &DirectIO{
pipes: pipes,
cio: cio{
config: fifos.Config,
closers: append(pipes.closers(), fifos),
cancel: cancel,
},
}, err
}
// IOCreate returns IO avaliable for use with task creation
func (f *DirectIO) IOCreate(id string) (IO, error) {
return f, nil
}
// IOAttach returns IO avaliable for use with task attachment
func (f *DirectIO) IOAttach(set *FIFOSet) (IO, error) {
return f, nil
}
// Config returns the Config
func (f *DirectIO) Config() Config {
return Config{
Terminal: f.terminal,
Stdin: f.set.In,
Stdout: f.set.Out,
Stderr: f.set.Err,
}
}
// Cancel stops any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Cancel() {
// nothing to cancel as all operations are handled externally
}
// Wait on any IO copy operations
//
// Not applicable for DirectIO
func (f *DirectIO) Wait() {
// nothing to wait on as all operations are handled externally
}
// Close closes all open fds
func (f *DirectIO) Close() error {
err := f.Stdin.Close()
if err2 := f.Stdout.Close(); err == nil {
err = err2
}
if err2 := f.Stderr.Close(); err == nil {
err = err2
}
return err
}
// Delete removes the underlying directory containing fifos
func (f *DirectIO) Delete() error {
if f.set.Dir == "" {
return nil
}
return os.RemoveAll(f.set.Dir)
func (p *pipes) closers() []io.Closer {
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
}

View File

@ -13,25 +13,26 @@ import (
const pipeRoot = `\\.\pipe`
// NewFifos returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) {
return &FIFOSet{
In: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
Out: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
Err: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
}, nil
// NewFIFOSetInDir returns a new set of fifos for the task
func NewFIFOSetInDir(_, id string, terminal bool) (*FIFOSet, error) {
return NewFIFOSet(Config{
Terminal: terminal,
Stdin: fmt.Sprintf(`%s\ctr-%s-stdin`, pipeRoot, id),
Stdout: fmt.Sprintf(`%s\ctr-%s-stdout`, pipeRoot, id),
Stderr: fmt.Sprintf(`%s\ctr-%s-stderr`, pipeRoot, id),
}, nil), nil
}
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
var (
wg sync.WaitGroup
set []io.Closer
)
if fifos.In != "" {
l, err := winio.ListenPipe(fifos.In, nil)
if fifos.Stdin != "" {
l, err := winio.ListenPipe(fifos.Stdin, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.In)
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdin)
}
defer func(l net.Listener) {
if err != nil {
@ -43,19 +44,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
go func() {
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.In)
log.L.WithError(err).Errorf("failed to accept stdin connection on %s", fifos.Stdin)
return
}
io.Copy(c, ioset.in)
io.Copy(c, ioset.Stdin)
c.Close()
l.Close()
}()
}
if fifos.Out != "" {
l, err := winio.ListenPipe(fifos.Out, nil)
if fifos.Stdout != "" {
l, err := winio.ListenPipe(fifos.Stdout, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Out)
return nil, errors.Wrapf(err, "failed to create stdin pipe %s", fifos.Stdout)
}
defer func(l net.Listener) {
if err != nil {
@ -69,19 +70,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
defer wg.Done()
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Out)
log.L.WithError(err).Errorf("failed to accept stdout connection on %s", fifos.Stdout)
return
}
io.Copy(ioset.out, c)
io.Copy(ioset.Stdout, c)
c.Close()
l.Close()
}()
}
if !tty && fifos.Err != "" {
l, err := winio.ListenPipe(fifos.Err, nil)
if !fifos.Terminal && fifos.Stderr != "" {
l, err := winio.ListenPipe(fifos.Stderr, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Err)
return nil, errors.Wrapf(err, "failed to create stderr pipe %s", fifos.Stderr)
}
defer func(l net.Listener) {
if err != nil {
@ -95,23 +96,29 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
defer wg.Done()
c, err := l.Accept()
if err != nil {
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Err)
log.L.WithError(err).Errorf("failed to accept stderr connection on %s", fifos.Stderr)
return
}
io.Copy(ioset.err, c)
io.Copy(ioset.Stderr, c)
c.Close()
l.Close()
}()
}
return &wgCloser{
wg: &wg,
dir: fifos.Dir,
set: set,
cancel: func() {
for _, l := range set {
l.Close()
}
},
}, nil
return &cio{config: fifos.Config, closers: set}, nil
}
// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
// and io.WriteCloser.
func NewDirectIO(stdin io.WriteCloser, stdout, stderr io.ReadCloser, terminal bool) *DirectIO {
return &DirectIO{
pipes: pipes{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
},
cio: cio{
config: Config{Terminal: terminal},
},
}
}

View File

@ -3,6 +3,7 @@ package containerd
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
@ -26,7 +27,7 @@ type Container interface {
// Delete removes the container
Delete(context.Context, ...DeleteOpts) error
// NewTask creates a new task based on the container metadata
NewTask(context.Context, cio.Creation, ...NewTaskOpts) (Task, error)
NewTask(context.Context, cio.Creator, ...NewTaskOpts) (Task, error)
// Spec returns the OCI runtime specification
Spec(context.Context) (*specs.Spec, error)
// Task returns the current task for the container
@ -162,7 +163,7 @@ func (c *container) Image(ctx context.Context) (Image, error) {
}, nil
}
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (_ Task, err error) {
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
i, err := ioCreate(c.id)
if err != nil {
return nil, err
@ -288,20 +289,23 @@ func (c *container) get(ctx context.Context) (containers.Container, error) {
return c.client.ContainerService().Get(ctx, c.id)
}
// get the existing fifo paths from the task information stored by the daemon
func attachExistingIO(response *tasks.GetResponse, ioAttach cio.Attach) (cio.IO, error) {
// get the existing fifo paths from the task information stored by the daemon
paths := &cio.FIFOSet{
Dir: getFifoDir([]string{
response.Process.Stdin,
response.Process.Stdout,
response.Process.Stderr,
}),
In: response.Process.Stdin,
Out: response.Process.Stdout,
Err: response.Process.Stderr,
Terminal: response.Process.Terminal,
path := getFifoDir([]string{
response.Process.Stdin,
response.Process.Stdout,
response.Process.Stderr,
})
closer := func() error {
return os.RemoveAll(path)
}
return ioAttach(paths)
fifoSet := cio.NewFIFOSet(cio.Config{
Stdin: response.Process.Stdin,
Stdout: response.Process.Stdout,
Stderr: response.Process.Stderr,
Terminal: response.Process.Terminal,
}, closer)
return ioAttach(fifoSet)
}
// getFifoDir looks for any non-empty path for a stdio fifo

View File

@ -77,7 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
r, err = seekReader(r, ws.Offset, size)
if err != nil {
if !isUnseekable(err) {
return errors.Wrapf(err, "unabled to resume write to %v", ws.Ref)
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
// reader is unseekable, try to move the writer back to the start.

View File

@ -2,6 +2,7 @@ package local
import (
"context"
"io"
"os"
"path/filepath"
"runtime"
@ -167,5 +168,8 @@ func (w *writer) Truncate(size int64) error {
}
w.offset = 0
w.digester.Hash().Reset()
if _, err := w.fp.Seek(0, io.SeekStart); err != nil {
return err
}
return w.fp.Truncate(0)
}

View File

@ -47,7 +47,10 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
defer f.Close()
cmd := newCommand(binary, daemonAddress, debug, config, f)
cmd, err := newCommand(binary, daemonAddress, debug, config, f)
if err != nil {
return nil, nil, err
}
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
@ -87,10 +90,10 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
}
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) *exec.Cmd {
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) (*exec.Cmd, error) {
selfExe, err := os.Executable()
if err != nil {
panic(err)
return nil, err
}
args := []string{
"-namespace", config.Namespace,
@ -123,7 +126,7 @@ func newCommand(binary, daemonAddress string, debug bool, config shim.Config, so
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd
return cmd, nil
}
func newSocket(address string) (*net.UnixListener, error) {

View File

@ -235,8 +235,8 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
// State returns runtime state information for a process
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
}

View File

@ -282,11 +282,14 @@ func (t *Task) Update(ctx context.Context, resources *types.Any) error {
// Process returns a specific process inside the task by the process id
func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
// TODO: verify process exists for container
return &Process{
p := &Process{
id: id,
t: t,
}, nil
}
if _, err := p.State(ctx); err != nil {
return nil, err
}
return p, nil
}
// Metrics returns runtime specific system level metric information for the task

View File

@ -191,6 +191,7 @@ func (p *process) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitS
return nil, errdefs.FromGRPC(err)
}
if p.io != nil {
p.io.Cancel()
p.io.Wait()
p.io.Close()
}

View File

@ -1,31 +0,0 @@
package remotes
import "strings"
// HintExists returns true if a hint of the provided kind and values exists in
// the set of provided hints.
func HintExists(kind, value string, hints ...string) bool {
for _, hint := range hints {
if strings.HasPrefix(hint, kind) && strings.HasSuffix(hint, value) {
return true
}
}
return false
}
// HintValues returns a slice of the values of the hints that match kind.
func HintValues(kind string, hints ...string) []string {
var values []string
for _, hint := range hints {
if strings.HasPrefix(hint, kind) {
parts := strings.SplitN(hint, ":", 2)
if len(parts) < 2 {
continue
}
values = append(values, parts[1])
}
}
return values
}

View File

@ -19,9 +19,9 @@ func apply(ctx context.Context, config *Config) error {
}
}
if config.OOMScore != 0 {
log.G(ctx).Infof("changing OOM score to %d", config.OOMScore)
log.G(ctx).Debugf("changing OOM score to %d", config.OOMScore)
if err := sys.SetOOMScore(os.Getpid(), config.OOMScore); err != nil {
return err
log.G(ctx).WithError(err).Errorf("failed to change OOM score to %d", config.OOMScore)
}
}
if config.Cgroup.Path != "" {

View File

@ -123,7 +123,7 @@ type Task interface {
// Resume the execution of the task
Resume(context.Context) error
// Exec creates a new process inside the task
Exec(context.Context, string, *specs.Process, cio.Creation) (Process, error)
Exec(context.Context, string, *specs.Process, cio.Creator) (Process, error)
// Pids returns a list of system specific process ids inside the task
Pids(context.Context) ([]ProcessInfo, error)
// Checkpoint serializes the runtime and memory information of a task into an
@ -163,6 +163,7 @@ func (t *task) Start(ctx context.Context) error {
ContainerID: t.id,
})
if err != nil {
t.io.Cancel()
t.io.Close()
return errdefs.FromGRPC(err)
}
@ -277,7 +278,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat
return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil
}
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (_ Process, err error) {
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creator) (_ Process, err error) {
if id == "" {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty")
}

View File

@ -15,8 +15,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.0
github.com/docker/go-units v0.3.1
github.com/gogo/protobuf v0.5
github.com/golang/protobuf 1643683e1b54a9e88ad26d98f81400c8c9d9f4f9
github.com/opencontainers/runtime-spec v1.0.0
github.com/opencontainers/runc 74a17296470088de3805e138d3d87c62e613dfc4
github.com/opencontainers/runtime-spec v1.0.1
github.com/opencontainers/runc 7f24b40cc5423969b4554ef04ba0b00e2b4ba010
github.com/sirupsen/logrus v1.0.0
github.com/containerd/btrfs cc52c4dea2ce11a44e6639e561bb5c2af9ada9e3
github.com/stretchr/testify v1.1.4
@ -25,18 +25,17 @@ github.com/pmezard/go-difflib v1.0.0
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
google.golang.org/grpc v1.7.2
google.golang.org/grpc v1.7.4
github.com/pkg/errors v0.8.0
github.com/opencontainers/go-digest 21dfd564fd89c944783d00d069f33e3e7123c448
golang.org/x/sys 314a259e304ff91bd6985da2a7149bbf91237993 https://github.com/golang/sys
github.com/opencontainers/image-spec v1.0.0
github.com/opencontainers/image-spec v1.0.1
github.com/containerd/continuity cf279e6ac893682272b4479d4c67fd3abf878b4e
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
github.com/BurntSushi/toml v0.2.0-21-g9906417
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
github.com/Microsoft/go-winio v0.4.4
github.com/Microsoft/hcsshim v0.6.7
github.com/Microsoft/opengcs v0.3.2
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4

View File

@ -51,7 +51,7 @@ Find more [FAQ on the OCI site](https://www.opencontainers.org/faq).
## Roadmap
The [GitHub milestones](https://github.com/opencontainers/image-spec/milestones) lay out the path to the OCI v1.0.0 release in late 2016.
The [GitHub milestones](https://github.com/opencontainers/image-spec/milestones) lay out the path to the future improvements.
# Contributing

View File

@ -22,7 +22,7 @@ const (
// VersionMinor is for functionality in a backwards-compatible manner
VersionMinor = 0
// VersionPatch is for backwards-compatible bug fixes
VersionPatch = 0
VersionPatch = 1
// VersionDev indicates development branch. Releases will be empty string.
VersionDev = ""

View File

@ -52,17 +52,12 @@ It also guarantees that the design is sound before code is written; a GitHub pul
Typos and grammatical errors can go straight to a pull-request.
When in doubt, start on the [mailing-list](#mailing-list).
### Weekly Call
The contributors and maintainers of all OCI projects have a weekly meeting on Wednesdays at:
* 8:00 AM (USA Pacific), during [odd weeks][iso-week].
* 2:00 PM (USA Pacific), during [even weeks][iso-week].
### Meetings
The contributors and maintainers of all OCI projects have monthly meetings at 2:00 PM (USA Pacific) on the first Wednesday of every month.
There is an [iCalendar][rfc5545] format for the meetings [here](meeting.ics).
Everyone is welcome to participate via [UberConference web][uberconference] or audio-only: +1 415 968 0849 (no PIN needed).
An initial agenda will be posted to the [mailing list](#mailing-list) earlier in the week, and everyone is welcome to propose additional topics or suggest other agenda alterations there.
An initial agenda will be posted to the [mailing list](#mailing-list) in the week before each meeting, and everyone is welcome to propose additional topics or suggest other agenda alterations there.
Minutes are posted to the [mailing list](#mailing-list) and minutes from past calls are archived [here][minutes], with minutes from especially old meetings (September 2015 and earlier) archived [here][runtime-wiki].
### Mailing List

View File

@ -4,7 +4,7 @@ import "os"
// Spec is the base configuration for the container.
type Spec struct {
// Version of the Open Container Runtime Specification with which the bundle complies.
// Version of the Open Container Initiative Runtime Specification with which the bundle complies.
Version string `json:"ociVersion"`
// Process configures the container process.
Process *Process `json:"process,omitempty"`

View File

@ -8,7 +8,7 @@ const (
// VersionMinor is for functionality in a backwards-compatible manner
VersionMinor = 0
// VersionPatch is for backwards-compatible bug fixes
VersionPatch = 0
VersionPatch = 1
// VersionDev indicates development branch. Releases will be empty string.
VersionDev = ""