forked from toolshed/abra
.gitea
cli
cmd
pkg
app
autocomplete
catalogue
client
compose
config
container
context
dns
git
limit
recipe
secret
server
ssh
upstream
commandconn
container
convert
stack
loader.go
options.go
remove.go
stack.go
web
scripts
tests
.drone.yml
.envrc.sample
.gitignore
.goreleaser.yml
Makefile
README.md
go.mod
go.sum
renovate.json
497 lines
16 KiB
Go
497 lines
16 KiB
Go
package stack // https://github.com/docker/cli/blob/master/cli/command/stack/swarm/common.go
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"strings"
|
|
"time"
|
|
|
|
abraClient "coopcloud.tech/abra/pkg/client"
|
|
"coopcloud.tech/abra/pkg/upstream/convert"
|
|
"github.com/docker/cli/cli/command/service/progress"
|
|
composetypes "github.com/docker/cli/cli/compose/types"
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/container"
|
|
"github.com/docker/docker/api/types/filters"
|
|
"github.com/docker/docker/api/types/swarm"
|
|
"github.com/docker/docker/api/types/versions"
|
|
"github.com/docker/docker/client"
|
|
dockerclient "github.com/docker/docker/client"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Resolve image constants
|
|
const (
|
|
defaultNetworkDriver = "overlay"
|
|
ResolveImageAlways = "always"
|
|
ResolveImageChanged = "changed"
|
|
ResolveImageNever = "never"
|
|
)
|
|
|
|
type StackStatus struct {
|
|
Services []swarm.Service
|
|
Err error
|
|
}
|
|
|
|
func getStackFilter(namespace string) filters.Args {
|
|
filter := filters.NewArgs()
|
|
filter.Add("label", convert.LabelNamespace+"="+namespace)
|
|
return filter
|
|
}
|
|
|
|
func getStackServiceFilter(namespace string) filters.Args {
|
|
return getStackFilter(namespace)
|
|
}
|
|
|
|
func getAllStacksFilter() filters.Args {
|
|
filter := filters.NewArgs()
|
|
filter.Add("label", convert.LabelNamespace)
|
|
return filter
|
|
}
|
|
|
|
func GetStackServices(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Service, error) {
|
|
return dockerclient.ServiceList(ctx, types.ServiceListOptions{Filters: getStackServiceFilter(namespace)})
|
|
}
|
|
|
|
// GetDeployedServicesByLabel filters services by label
|
|
func GetDeployedServicesByLabel(contextName string, label string) StackStatus {
|
|
cl, err := abraClient.New(contextName)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
// No local context found, bail out gracefully
|
|
return StackStatus{[]swarm.Service{}, nil}
|
|
}
|
|
return StackStatus{[]swarm.Service{}, err}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
filters := filters.NewArgs()
|
|
filters.Add("label", label)
|
|
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filters})
|
|
if err != nil {
|
|
return StackStatus{[]swarm.Service{}, err}
|
|
}
|
|
|
|
return StackStatus{services, nil}
|
|
}
|
|
|
|
func GetAllDeployedServices(contextName string) StackStatus {
|
|
cl, err := abraClient.New(contextName)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
// No local context found, bail out gracefully
|
|
return StackStatus{[]swarm.Service{}, nil}
|
|
}
|
|
return StackStatus{[]swarm.Service{}, err}
|
|
}
|
|
|
|
ctx := context.Background()
|
|
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: getAllStacksFilter()})
|
|
if err != nil {
|
|
return StackStatus{[]swarm.Service{}, err}
|
|
}
|
|
|
|
return StackStatus{services, nil}
|
|
}
|
|
|
|
// GetDeployedServicesByName filters services by name
|
|
func GetDeployedServicesByName(ctx context.Context, cl *dockerclient.Client, stackName, serviceName string) StackStatus {
|
|
filters := filters.NewArgs()
|
|
filters.Add("name", fmt.Sprintf("%s_%s", stackName, serviceName))
|
|
|
|
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filters})
|
|
if err != nil {
|
|
return StackStatus{[]swarm.Service{}, err}
|
|
}
|
|
|
|
return StackStatus{services, nil}
|
|
}
|
|
|
|
// IsDeployed chekcks whether an appp is deployed or not.
|
|
func IsDeployed(ctx context.Context, cl *dockerclient.Client, stackName string) (bool, string, error) {
|
|
version := ""
|
|
isDeployed := false
|
|
|
|
filter := filters.NewArgs()
|
|
filter.Add("label", fmt.Sprintf("%s=%s", convert.LabelNamespace, stackName))
|
|
|
|
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filter})
|
|
if err != nil {
|
|
return false, version, err
|
|
}
|
|
|
|
if len(services) > 0 {
|
|
for _, service := range services {
|
|
labelKey := fmt.Sprintf("coop-cloud.%s.version", stackName)
|
|
if deployedVersion, ok := service.Spec.Labels[labelKey]; ok {
|
|
version = deployedVersion
|
|
break
|
|
}
|
|
}
|
|
|
|
logrus.Debugf("'%s' has been detected as deployed with version '%s'", stackName, version)
|
|
|
|
return true, version, nil
|
|
}
|
|
|
|
logrus.Debugf("'%s' has been detected as not deployed", stackName)
|
|
return isDeployed, version, nil
|
|
}
|
|
|
|
// pruneServices removes services that are no longer referenced in the source
|
|
func pruneServices(ctx context.Context, cl *dockerclient.Client, namespace convert.Namespace, services map[string]struct{}) {
|
|
oldServices, err := GetStackServices(ctx, cl, namespace.Name())
|
|
if err != nil {
|
|
logrus.Infof("Failed to list services: %s\n", err)
|
|
}
|
|
|
|
pruneServices := []swarm.Service{}
|
|
for _, service := range oldServices {
|
|
if _, exists := services[namespace.Descope(service.Spec.Name)]; !exists {
|
|
pruneServices = append(pruneServices, service)
|
|
}
|
|
}
|
|
removeServices(ctx, cl, pruneServices)
|
|
}
|
|
|
|
// RunDeploy is the swarm implementation of docker stack deploy
|
|
func RunDeploy(cl *dockerclient.Client, opts Deploy, cfg *composetypes.Config) error {
|
|
ctx := context.Background()
|
|
|
|
if err := validateResolveImageFlag(&opts); err != nil {
|
|
return err
|
|
}
|
|
// client side image resolution should not be done when the supported
|
|
// server version is older than 1.30
|
|
if versions.LessThan(cl.ClientVersion(), "1.30") {
|
|
opts.ResolveImage = ResolveImageNever
|
|
}
|
|
|
|
return deployCompose(ctx, cl, opts, cfg)
|
|
}
|
|
|
|
// validateResolveImageFlag validates the opts.resolveImage command line option
|
|
func validateResolveImageFlag(opts *Deploy) error {
|
|
switch opts.ResolveImage {
|
|
case ResolveImageAlways, ResolveImageChanged, ResolveImageNever:
|
|
return nil
|
|
default:
|
|
return errors.Errorf("Invalid option %s for flag --resolve-image", opts.ResolveImage)
|
|
}
|
|
}
|
|
|
|
func deployCompose(ctx context.Context, cl *dockerclient.Client, opts Deploy, config *composetypes.Config) error {
|
|
namespace := convert.NewNamespace(opts.Namespace)
|
|
|
|
if opts.Prune {
|
|
services := map[string]struct{}{}
|
|
for _, service := range config.Services {
|
|
services[service.Name] = struct{}{}
|
|
}
|
|
pruneServices(ctx, cl, namespace, services)
|
|
}
|
|
|
|
serviceNetworks := getServicesDeclaredNetworks(config.Services)
|
|
networks, externalNetworks := convert.Networks(namespace, config.Networks, serviceNetworks)
|
|
if err := validateExternalNetworks(ctx, cl, externalNetworks); err != nil {
|
|
return err
|
|
}
|
|
if err := createNetworks(ctx, cl, namespace, networks); err != nil {
|
|
return err
|
|
}
|
|
|
|
secrets, err := convert.Secrets(namespace, config.Secrets)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := createSecrets(ctx, cl, secrets); err != nil {
|
|
return err
|
|
}
|
|
|
|
configs, err := convert.Configs(namespace, config.Configs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := createConfigs(ctx, cl, configs); err != nil {
|
|
return err
|
|
}
|
|
|
|
services, err := convert.Services(namespace, config, cl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return deployServices(ctx, cl, services, namespace, opts.SendRegistryAuth, opts.ResolveImage)
|
|
}
|
|
|
|
func getServicesDeclaredNetworks(serviceConfigs []composetypes.ServiceConfig) map[string]struct{} {
|
|
serviceNetworks := map[string]struct{}{}
|
|
for _, serviceConfig := range serviceConfigs {
|
|
if len(serviceConfig.Networks) == 0 {
|
|
serviceNetworks["default"] = struct{}{}
|
|
continue
|
|
}
|
|
for network := range serviceConfig.Networks {
|
|
serviceNetworks[network] = struct{}{}
|
|
}
|
|
}
|
|
return serviceNetworks
|
|
}
|
|
|
|
func validateExternalNetworks(ctx context.Context, client dockerclient.NetworkAPIClient, externalNetworks []string) error {
|
|
for _, networkName := range externalNetworks {
|
|
if !container.NetworkMode(networkName).IsUserDefined() {
|
|
// Networks that are not user defined always exist on all nodes as
|
|
// local-scoped networks, so there's no need to inspect them.
|
|
continue
|
|
}
|
|
network, err := client.NetworkInspect(ctx, networkName, types.NetworkInspectOptions{})
|
|
switch {
|
|
case dockerclient.IsErrNotFound(err):
|
|
return errors.Errorf("network %q is declared as external, but could not be found. You need to create a swarm-scoped network before the stack is deployed", networkName)
|
|
case err != nil:
|
|
return err
|
|
case network.Scope != "swarm":
|
|
return errors.Errorf("network %q is declared as external, but it is not in the right scope: %q instead of \"swarm\"", networkName, network.Scope)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createSecrets(ctx context.Context, cl *dockerclient.Client, secrets []swarm.SecretSpec) error {
|
|
for _, secretSpec := range secrets {
|
|
secret, _, err := cl.SecretInspectWithRaw(ctx, secretSpec.Name)
|
|
switch {
|
|
case err == nil:
|
|
// secret already exists, then we update that
|
|
if err := cl.SecretUpdate(ctx, secret.ID, secret.Meta.Version, secretSpec); err != nil {
|
|
return errors.Wrapf(err, "failed to update secret %s", secretSpec.Name)
|
|
}
|
|
case dockerclient.IsErrNotFound(err):
|
|
// secret does not exist, then we create a new one.
|
|
logrus.Infof("Creating secret %s\n", secretSpec.Name)
|
|
if _, err := cl.SecretCreate(ctx, secretSpec); err != nil {
|
|
return errors.Wrapf(err, "failed to create secret %s", secretSpec.Name)
|
|
}
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createConfigs(ctx context.Context, cl *dockerclient.Client, configs []swarm.ConfigSpec) error {
|
|
for _, configSpec := range configs {
|
|
config, _, err := cl.ConfigInspectWithRaw(ctx, configSpec.Name)
|
|
switch {
|
|
case err == nil:
|
|
// config already exists, then we update that
|
|
if err := cl.ConfigUpdate(ctx, config.ID, config.Meta.Version, configSpec); err != nil {
|
|
return errors.Wrapf(err, "failed to update config %s", configSpec.Name)
|
|
}
|
|
case dockerclient.IsErrNotFound(err):
|
|
// config does not exist, then we create a new one.
|
|
logrus.Infof("Creating config %s\n", configSpec.Name)
|
|
if _, err := cl.ConfigCreate(ctx, configSpec); err != nil {
|
|
return errors.Wrapf(err, "failed to create config %s", configSpec.Name)
|
|
}
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createNetworks(ctx context.Context, cl *dockerclient.Client, namespace convert.Namespace, networks map[string]types.NetworkCreate) error {
|
|
existingNetworks, err := getStackNetworks(ctx, cl, namespace.Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existingNetworkMap := make(map[string]types.NetworkResource)
|
|
for _, network := range existingNetworks {
|
|
existingNetworkMap[network.Name] = network
|
|
}
|
|
|
|
for name, createOpts := range networks {
|
|
if _, exists := existingNetworkMap[name]; exists {
|
|
continue
|
|
}
|
|
|
|
if createOpts.Driver == "" {
|
|
createOpts.Driver = defaultNetworkDriver
|
|
}
|
|
|
|
logrus.Infof("Creating network %s\n", name)
|
|
if _, err := cl.NetworkCreate(ctx, name, createOpts); err != nil {
|
|
return errors.Wrapf(err, "failed to create network %s", name)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func deployServices(
|
|
ctx context.Context,
|
|
cl *dockerclient.Client,
|
|
services map[string]swarm.ServiceSpec,
|
|
namespace convert.Namespace,
|
|
sendAuth bool,
|
|
resolveImage string) error {
|
|
existingServices, err := GetStackServices(ctx, cl, namespace.Name())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existingServiceMap := make(map[string]swarm.Service)
|
|
for _, service := range existingServices {
|
|
existingServiceMap[service.Spec.Name] = service
|
|
}
|
|
|
|
serviceIDs := make(map[string]string)
|
|
for internalName, serviceSpec := range services {
|
|
var (
|
|
name = namespace.Scope(internalName)
|
|
image = serviceSpec.TaskTemplate.ContainerSpec.Image
|
|
encodedAuth string
|
|
)
|
|
|
|
// FIXME: disable for now as not sure how to avoid having a `dockerCli`
|
|
// instance here and would rather not copy/pasta that entire module in
|
|
// right now for something that we don't even support right now. Will skip
|
|
// this for now.
|
|
if sendAuth {
|
|
// Retrieve encoded auth token from the image reference
|
|
// encodedAuth, err = command.RetrieveAuthTokenFromImage(ctx, dockerCli, image)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
}
|
|
|
|
if service, exists := existingServiceMap[name]; exists {
|
|
logrus.Infof("Updating service %s (id: %s)\n", name, service.ID)
|
|
|
|
updateOpts := types.ServiceUpdateOptions{EncodedRegistryAuth: encodedAuth}
|
|
|
|
switch resolveImage {
|
|
case ResolveImageAlways:
|
|
// image should be updated by the server using QueryRegistry
|
|
updateOpts.QueryRegistry = true
|
|
case ResolveImageChanged:
|
|
if image != service.Spec.Labels[convert.LabelImage] {
|
|
// Query the registry to resolve digest for the updated image
|
|
updateOpts.QueryRegistry = true
|
|
} else {
|
|
// image has not changed; update the serviceSpec with the
|
|
// existing information that was set by QueryRegistry on the
|
|
// previous deploy. Otherwise this will trigger an incorrect
|
|
// service update.
|
|
serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image
|
|
}
|
|
default:
|
|
if image == service.Spec.Labels[convert.LabelImage] {
|
|
// image has not changed; update the serviceSpec with the
|
|
// existing information that was set by QueryRegistry on the
|
|
// previous deploy. Otherwise this will trigger an incorrect
|
|
// service update.
|
|
serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image
|
|
}
|
|
}
|
|
|
|
// Stack deploy does not have a `--force` option. Preserve existing
|
|
// ForceUpdate value so that tasks are not re-deployed if not updated.
|
|
// TODO move this to API client?
|
|
serviceSpec.TaskTemplate.ForceUpdate = service.Spec.TaskTemplate.ForceUpdate
|
|
|
|
response, err := cl.ServiceUpdate(ctx, service.ID, service.Version, serviceSpec, updateOpts)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to update service %s", name)
|
|
}
|
|
|
|
serviceIDs[service.ID] = name
|
|
|
|
for _, warning := range response.Warnings {
|
|
logrus.Warn(warning)
|
|
}
|
|
} else {
|
|
logrus.Infof("Creating service %s\n", name)
|
|
|
|
createOpts := types.ServiceCreateOptions{EncodedRegistryAuth: encodedAuth}
|
|
|
|
// query registry if flag disabling it was not set
|
|
if resolveImage == ResolveImageAlways || resolveImage == ResolveImageChanged {
|
|
createOpts.QueryRegistry = true
|
|
}
|
|
|
|
serviceCreateResponse, err := cl.ServiceCreate(ctx, serviceSpec, createOpts)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to create service %s", name)
|
|
}
|
|
|
|
serviceIDs[serviceCreateResponse.ID] = name
|
|
}
|
|
}
|
|
|
|
var serviceNames []string
|
|
for _, serviceName := range serviceIDs {
|
|
serviceNames = append(serviceNames, serviceName)
|
|
}
|
|
logrus.Infof("waiting for services to converge: %s", strings.Join(serviceNames, ", "))
|
|
|
|
ch := make(chan error, len(serviceIDs))
|
|
for serviceID, serviceName := range serviceIDs {
|
|
logrus.Debugf("waiting on %s to converge", serviceName)
|
|
go func(s string) {
|
|
ch <- waitOnService(ctx, cl, s)
|
|
}(serviceID)
|
|
}
|
|
|
|
for _, serviceID := range serviceIDs {
|
|
err := <-ch
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logrus.Debugf("assuming %s converged successfully", serviceID)
|
|
}
|
|
|
|
logrus.Info("services converged 👌")
|
|
|
|
return nil
|
|
}
|
|
|
|
func getStackNetworks(ctx context.Context, dockerclient client.APIClient, namespace string) ([]types.NetworkResource, error) {
|
|
return dockerclient.NetworkList(ctx, types.NetworkListOptions{Filters: getStackFilter(namespace)})
|
|
}
|
|
|
|
func getStackSecrets(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Secret, error) {
|
|
return dockerclient.SecretList(ctx, types.SecretListOptions{Filters: getStackFilter(namespace)})
|
|
}
|
|
|
|
func getStackConfigs(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Config, error) {
|
|
return dockerclient.ConfigList(ctx, types.ConfigListOptions{Filters: getStackFilter(namespace)})
|
|
}
|
|
|
|
// https://github.com/docker/cli/blob/master/cli/command/service/helpers.go
|
|
// https://github.com/docker/cli/blob/master/cli/command/service/progress/progress.go
|
|
func waitOnService(ctx context.Context, cl *dockerclient.Client, serviceID string) error {
|
|
errChan := make(chan error, 1)
|
|
pipeReader, pipeWriter := io.Pipe()
|
|
|
|
go func() {
|
|
errChan <- progress.ServiceProgress(ctx, cl, serviceID, pipeWriter)
|
|
}()
|
|
|
|
go io.Copy(ioutil.Discard, pipeReader)
|
|
|
|
timeout := 60 * time.Second
|
|
|
|
select {
|
|
case err := <-errChan:
|
|
return err
|
|
case <-time.After(timeout):
|
|
return fmt.Errorf("%s has still not converged (%s second timeout)?", serviceID, timeout)
|
|
}
|
|
}
|