abra/pkg/upstream/stack/stack.go

536 lines
17 KiB
Go

package stack // https://github.com/docker/cli/blob/master/cli/command/stack/swarm/common.go
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"time"
"coopcloud.tech/abra/pkg/upstream/convert"
"github.com/docker/cli/cli/command/service/progress"
"github.com/docker/cli/cli/command/stack/formatter"
composetypes "github.com/docker/cli/cli/compose/types"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/client"
dockerClient "github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Resolve image constants
const (
defaultNetworkDriver = "overlay"
ResolveImageAlways = "always"
ResolveImageChanged = "changed"
ResolveImageNever = "never"
)
// Timeout to wait until docker services converge, default is 50s (random choice)
var WaitTimeout int = 50
type StackStatus struct {
Services []swarm.Service
Err error
}
func getStackFilter(namespace string) filters.Args {
filter := filters.NewArgs()
filter.Add("label", convert.LabelNamespace+"="+namespace)
return filter
}
func getStackServiceFilter(namespace string) filters.Args {
return getStackFilter(namespace)
}
func getAllStacksFilter() filters.Args {
filter := filters.NewArgs()
filter.Add("label", convert.LabelNamespace)
return filter
}
func GetStackServices(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Service, error) {
return dockerclient.ServiceList(ctx, types.ServiceListOptions{Filters: getStackServiceFilter(namespace)})
}
// GetDeployedServicesByLabel filters services by label
func GetDeployedServicesByLabel(cl *dockerClient.Client, contextName string, label string) StackStatus {
filters := filters.NewArgs()
filters.Add("label", label)
services, err := cl.ServiceList(context.Background(), types.ServiceListOptions{Filters: filters})
if err != nil {
return StackStatus{[]swarm.Service{}, err}
}
return StackStatus{services, nil}
}
func GetAllDeployedServices(cl *dockerClient.Client, contextName string) StackStatus {
services, err := cl.ServiceList(context.Background(), types.ServiceListOptions{Filters: getAllStacksFilter()})
if err != nil {
return StackStatus{[]swarm.Service{}, err}
}
return StackStatus{services, nil}
}
// GetDeployedServicesByName filters services by name
func GetDeployedServicesByName(ctx context.Context, cl *dockerClient.Client, stackName, serviceName string) StackStatus {
filters := filters.NewArgs()
filters.Add("name", fmt.Sprintf("%s_%s", stackName, serviceName))
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filters})
if err != nil {
return StackStatus{[]swarm.Service{}, err}
}
return StackStatus{services, nil}
}
// IsDeployed chekcks whether an appp is deployed or not.
func IsDeployed(ctx context.Context, cl *dockerClient.Client, stackName string) (bool, string, error) {
version := "unknown"
isDeployed := false
filter := filters.NewArgs()
filter.Add("label", fmt.Sprintf("%s=%s", convert.LabelNamespace, stackName))
services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filter})
if err != nil {
return false, version, err
}
if len(services) > 0 {
for _, service := range services {
labelKey := fmt.Sprintf("coop-cloud.%s.version", stackName)
if deployedVersion, ok := service.Spec.Labels[labelKey]; ok {
version = deployedVersion
break
}
}
logrus.Debugf("%s has been detected as deployed with version %s", stackName, version)
return true, version, nil
}
logrus.Debugf("%s has been detected as not deployed", stackName)
return isDeployed, version, nil
}
// pruneServices removes services that are no longer referenced in the source
func pruneServices(ctx context.Context, cl *dockerClient.Client, namespace convert.Namespace, services map[string]struct{}) {
oldServices, err := GetStackServices(ctx, cl, namespace.Name())
if err != nil {
logrus.Infof("Failed to list services: %s\n", err)
}
pruneServices := []swarm.Service{}
for _, service := range oldServices {
if _, exists := services[namespace.Descope(service.Spec.Name)]; !exists {
pruneServices = append(pruneServices, service)
}
}
removeServices(ctx, cl, pruneServices)
}
// RunDeploy is the swarm implementation of docker stack deploy
func RunDeploy(cl *dockerClient.Client, opts Deploy, cfg *composetypes.Config, appName string, dontWait bool) error {
if err := validateResolveImageFlag(&opts); err != nil {
return err
}
// client side image resolution should not be done when the supported
// server version is older than 1.30
if versions.LessThan(cl.ClientVersion(), "1.30") {
opts.ResolveImage = ResolveImageNever
}
return deployCompose(context.Background(), cl, opts, cfg, appName, dontWait)
}
// validateResolveImageFlag validates the opts.resolveImage command line option
func validateResolveImageFlag(opts *Deploy) error {
switch opts.ResolveImage {
case ResolveImageAlways, ResolveImageChanged, ResolveImageNever:
return nil
default:
return errors.Errorf("Invalid option %s for flag --resolve-image", opts.ResolveImage)
}
}
func deployCompose(ctx context.Context, cl *dockerClient.Client, opts Deploy, config *composetypes.Config, appName string, dontWait bool) error {
namespace := convert.NewNamespace(opts.Namespace)
if opts.Prune {
services := map[string]struct{}{}
for _, service := range config.Services {
services[service.Name] = struct{}{}
}
pruneServices(ctx, cl, namespace, services)
}
serviceNetworks := getServicesDeclaredNetworks(config.Services)
networks, externalNetworks := convert.Networks(namespace, config.Networks, serviceNetworks)
if err := validateExternalNetworks(ctx, cl, externalNetworks); err != nil {
return err
}
if err := createNetworks(ctx, cl, namespace, networks); err != nil {
return err
}
secrets, err := convert.Secrets(namespace, config.Secrets)
if err != nil {
return err
}
if err := createSecrets(ctx, cl, secrets); err != nil {
return err
}
configs, err := convert.Configs(namespace, config.Configs)
if err != nil {
return err
}
if err := createConfigs(ctx, cl, configs); err != nil {
return err
}
services, err := convert.Services(namespace, config, cl)
if err != nil {
return err
}
return deployServices(ctx, cl, services, namespace, opts.SendRegistryAuth, opts.ResolveImage, appName, dontWait)
}
func getServicesDeclaredNetworks(serviceConfigs []composetypes.ServiceConfig) map[string]struct{} {
serviceNetworks := map[string]struct{}{}
for _, serviceConfig := range serviceConfigs {
if len(serviceConfig.Networks) == 0 {
serviceNetworks["default"] = struct{}{}
continue
}
for network := range serviceConfig.Networks {
serviceNetworks[network] = struct{}{}
}
}
return serviceNetworks
}
func validateExternalNetworks(ctx context.Context, client dockerClient.NetworkAPIClient, externalNetworks []string) error {
for _, networkName := range externalNetworks {
if !container.NetworkMode(networkName).IsUserDefined() {
// Networks that are not user defined always exist on all nodes as
// local-scoped networks, so there's no need to inspect them.
continue
}
network, err := client.NetworkInspect(ctx, networkName, types.NetworkInspectOptions{})
switch {
case dockerClient.IsErrNotFound(err):
return errors.Errorf("network %q is declared as external, but could not be found. You need to create a swarm-scoped network before the stack is deployed, which you can do by running this on the server: docker network create -d overlay proxy", networkName)
case err != nil:
return err
case network.Scope != "swarm":
return errors.Errorf("network %q is declared as external, but it is not in the right scope: %q instead of \"swarm\"", networkName, network.Scope)
}
}
return nil
}
func createSecrets(ctx context.Context, cl *dockerClient.Client, secrets []swarm.SecretSpec) error {
for _, secretSpec := range secrets {
secret, _, err := cl.SecretInspectWithRaw(ctx, secretSpec.Name)
switch {
case err == nil:
// secret already exists, then we update that
if err := cl.SecretUpdate(ctx, secret.ID, secret.Meta.Version, secretSpec); err != nil {
return errors.Wrapf(err, "failed to update secret %s", secretSpec.Name)
}
case dockerClient.IsErrNotFound(err):
// secret does not exist, then we create a new one.
logrus.Infof("Creating secret %s\n", secretSpec.Name)
if _, err := cl.SecretCreate(ctx, secretSpec); err != nil {
return errors.Wrapf(err, "failed to create secret %s", secretSpec.Name)
}
default:
return err
}
}
return nil
}
func createConfigs(ctx context.Context, cl *dockerClient.Client, configs []swarm.ConfigSpec) error {
for _, configSpec := range configs {
config, _, err := cl.ConfigInspectWithRaw(ctx, configSpec.Name)
switch {
case err == nil:
// config already exists, then we update that
if err := cl.ConfigUpdate(ctx, config.ID, config.Meta.Version, configSpec); err != nil {
return errors.Wrapf(err, "failed to update config %s", configSpec.Name)
}
case dockerClient.IsErrNotFound(err):
// config does not exist, then we create a new one.
logrus.Infof("Creating config %s\n", configSpec.Name)
if _, err := cl.ConfigCreate(ctx, configSpec); err != nil {
return errors.Wrapf(err, "failed to create config %s", configSpec.Name)
}
default:
return err
}
}
return nil
}
func createNetworks(ctx context.Context, cl *dockerClient.Client, namespace convert.Namespace, networks map[string]types.NetworkCreate) error {
existingNetworks, err := getStackNetworks(ctx, cl, namespace.Name())
if err != nil {
return err
}
existingNetworkMap := make(map[string]types.NetworkResource)
for _, network := range existingNetworks {
existingNetworkMap[network.Name] = network
}
for name, createOpts := range networks {
if _, exists := existingNetworkMap[name]; exists {
continue
}
if createOpts.Driver == "" {
createOpts.Driver = defaultNetworkDriver
}
logrus.Infof("Creating network %s\n", name)
if _, err := cl.NetworkCreate(ctx, name, createOpts); err != nil {
return errors.Wrapf(err, "failed to create network %s", name)
}
}
return nil
}
func deployServices(
ctx context.Context,
cl *dockerClient.Client,
services map[string]swarm.ServiceSpec,
namespace convert.Namespace,
sendAuth bool,
resolveImage string,
appName string,
dontWait bool) error {
existingServices, err := GetStackServices(ctx, cl, namespace.Name())
if err != nil {
return err
}
existingServiceMap := make(map[string]swarm.Service)
for _, service := range existingServices {
existingServiceMap[service.Spec.Name] = service
}
serviceIDs := make(map[string]string)
for internalName, serviceSpec := range services {
var (
name = namespace.Scope(internalName)
image = serviceSpec.TaskTemplate.ContainerSpec.Image
encodedAuth string
)
if service, exists := existingServiceMap[name]; exists {
logrus.Infof("Updating service %s (id: %s)\n", name, service.ID)
updateOpts := types.ServiceUpdateOptions{EncodedRegistryAuth: encodedAuth}
switch resolveImage {
case ResolveImageAlways:
// image should be updated by the server using QueryRegistry
updateOpts.QueryRegistry = true
case ResolveImageChanged:
if image != service.Spec.Labels[convert.LabelImage] {
// Query the registry to resolve digest for the updated image
updateOpts.QueryRegistry = true
} else {
// image has not changed; update the serviceSpec with the
// existing information that was set by QueryRegistry on the
// previous deploy. Otherwise this will trigger an incorrect
// service update.
serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image
}
default:
if image == service.Spec.Labels[convert.LabelImage] {
// image has not changed; update the serviceSpec with the
// existing information that was set by QueryRegistry on the
// previous deploy. Otherwise this will trigger an incorrect
// service update.
serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image
}
}
// Stack deploy does not have a `--force` option. Preserve existing
// ForceUpdate value so that tasks are not re-deployed if not updated.
serviceSpec.TaskTemplate.ForceUpdate = service.Spec.TaskTemplate.ForceUpdate
response, err := cl.ServiceUpdate(ctx, service.ID, service.Version, serviceSpec, updateOpts)
if err != nil {
return errors.Wrapf(err, "failed to update service %s", name)
}
serviceIDs[service.ID] = name
for _, warning := range response.Warnings {
logrus.Warn(warning)
}
} else {
logrus.Infof("Creating service %s\n", name)
createOpts := types.ServiceCreateOptions{EncodedRegistryAuth: encodedAuth}
// query registry if flag disabling it was not set
if resolveImage == ResolveImageAlways || resolveImage == ResolveImageChanged {
createOpts.QueryRegistry = true
}
serviceCreateResponse, err := cl.ServiceCreate(ctx, serviceSpec, createOpts)
if err != nil {
return errors.Wrapf(err, "failed to create service %s", name)
}
serviceIDs[serviceCreateResponse.ID] = name
}
}
var serviceNames []string
for _, serviceName := range serviceIDs {
serviceNames = append(serviceNames, serviceName)
}
if dontWait {
logrus.Warn("skipping converge logic checks")
return nil
}
logrus.Infof("Waiting for %s to deploy... please hold 🤚", appName)
ch := make(chan error, len(serviceIDs))
for serviceID, serviceName := range serviceIDs {
logrus.Debugf("waiting on %s to converge", serviceName)
go func(sID, sName, aName string) {
ch <- WaitOnService(ctx, cl, sID, aName)
}(serviceID, serviceName, appName)
}
for _, serviceID := range serviceIDs {
err := <-ch
if err != nil {
return err
}
logrus.Debugf("assuming %s converged successfully", serviceID)
}
logrus.Infof("Successfully deployed %s", appName)
return nil
}
func getStackNetworks(ctx context.Context, dockerclient client.APIClient, namespace string) ([]types.NetworkResource, error) {
return dockerclient.NetworkList(ctx, types.NetworkListOptions{Filters: getStackFilter(namespace)})
}
func getStackSecrets(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Secret, error) {
return dockerclient.SecretList(ctx, types.SecretListOptions{Filters: getStackFilter(namespace)})
}
func getStackConfigs(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Config, error) {
return dockerclient.ConfigList(ctx, types.ConfigListOptions{Filters: getStackFilter(namespace)})
}
// https://github.com/docker/cli/blob/master/cli/command/service/helpers.go
// https://github.com/docker/cli/blob/master/cli/command/service/progress/progress.go
func WaitOnService(ctx context.Context, cl *dockerClient.Client, serviceID, appName string) error {
errChan := make(chan error, 1)
pipeReader, pipeWriter := io.Pipe()
sigintChannel := make(chan os.Signal, 1)
signal.Notify(sigintChannel, os.Interrupt)
defer signal.Stop(sigintChannel)
go func() {
errChan <- progress.ServiceProgress(ctx, cl, serviceID, pipeWriter)
}()
go io.Copy(ioutil.Discard, pipeReader)
timeout := time.Duration(WaitTimeout) * time.Second
select {
case err := <-errChan:
return err
case <-sigintChannel:
return fmt.Errorf(fmt.Sprintf(`
Not waiting for %s to deploy. The deployment is ongoing...
If you want to stop the deployment, try:
abra app undeploy %s`, appName, appName))
case <-time.After(timeout):
return fmt.Errorf(fmt.Sprintf(`
%s has not converged (%s second timeout reached).
This does not necessarily mean your deployment has failed, it may just be that
the app is taking longer to deploy based on your server resources or network
latency.
You can track latest deployment status with:
abra app ps --watch %s
And inspect the logs with:
abra app logs %s
If a service is failing to even start, try to smoke out the error with:
abra app errors --watch %s
`, appName, timeout, appName, appName, appName))
}
}
// Copypasta from https://github.com/docker/cli/blob/master/cli/command/stack/swarm/list.go
// GetStacks lists the swarm stacks.
func GetStacks(cl *dockerClient.Client) ([]*formatter.Stack, error) {
services, err := cl.ServiceList(
context.Background(),
types.ServiceListOptions{Filters: getAllStacksFilter()})
if err != nil {
return nil, err
}
m := make(map[string]*formatter.Stack)
for _, service := range services {
labels := service.Spec.Labels
name, ok := labels[convert.LabelNamespace]
if !ok {
return nil, errors.Errorf("cannot get label %s for service %s",
convert.LabelNamespace, service.ID)
}
ztack, ok := m[name]
if !ok {
m[name] = &formatter.Stack{
Name: name,
Services: 1,
}
} else {
ztack.Services++
}
}
var stacks []*formatter.Stack
for _, stack := range m {
stacks = append(stacks, stack)
}
return stacks, nil
}