package stack // https://github.com/docker/cli/blob/master/cli/command/stack/swarm/common.go import ( "context" "fmt" "os" "path/filepath" "strconv" "strings" "time" stdlibErr "errors" tea "github.com/charmbracelet/bubbletea" "coopcloud.tech/abra/pkg/config" "coopcloud.tech/abra/pkg/log" "coopcloud.tech/abra/pkg/ui" "coopcloud.tech/abra/pkg/upstream/convert" "github.com/docker/cli/cli/command/stack/formatter" composetypes "github.com/docker/cli/cli/compose/types" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" networktypes "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/versions" "github.com/docker/docker/client" dockerClient "github.com/docker/docker/client" "github.com/pkg/errors" ) // Resolve image constants const ( defaultNetworkDriver = "overlay" ResolveImageAlways = "always" ResolveImageChanged = "changed" ResolveImageNever = "never" ) // Timeout to wait until docker services converge, default is 50s (random choice) var WaitTimeout = 50 type StackStatus struct { Services []swarm.Service Err error } func getStackFilter(namespace string) filters.Args { filter := filters.NewArgs() filter.Add("label", convert.LabelNamespace+"="+namespace) return filter } func getStackServiceFilter(namespace string) filters.Args { return getStackFilter(namespace) } func getAllStacksFilter() filters.Args { filter := filters.NewArgs() filter.Add("label", convert.LabelNamespace) return filter } func GetStackServices(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Service, error) { return dockerclient.ServiceList(ctx, types.ServiceListOptions{Filters: getStackServiceFilter(namespace)}) } // GetDeployedServicesByLabel filters services by label func GetDeployedServicesByLabel(cl *dockerClient.Client, contextName string, label string) StackStatus { filters := filters.NewArgs() filters.Add("label", label) services, err := cl.ServiceList(context.Background(), types.ServiceListOptions{Filters: filters}) if err != nil { return StackStatus{[]swarm.Service{}, err} } return StackStatus{services, nil} } func GetAllDeployedServices(cl *dockerClient.Client, contextName string) StackStatus { services, err := cl.ServiceList(context.Background(), types.ServiceListOptions{Filters: getAllStacksFilter()}) if err != nil { return StackStatus{[]swarm.Service{}, err} } return StackStatus{services, nil} } // GetDeployedServicesByName filters services by name func GetDeployedServicesByName(ctx context.Context, cl *dockerClient.Client, stackName, serviceName string) StackStatus { filters := filters.NewArgs() filters.Add("name", fmt.Sprintf("%s_%s", stackName, serviceName)) services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filters}) if err != nil { return StackStatus{[]swarm.Service{}, err} } return StackStatus{services, nil} } // DeployMeta is runtime metadata about an app deployment. type DeployMeta struct { IsDeployed bool // whether the app is deployed or not Version string // the deployed version IsChaos bool // whether or not the deployment is --chaos ChaosVersion string // the --chaos deployment version } func (d DeployMeta) String() string { var out string out += fmt.Sprintf("{isDeployed: %v, ", d.IsDeployed) out += fmt.Sprintf("version: %s, ", d.Version) out += fmt.Sprintf("isChaos: %v, ", d.IsChaos) out += fmt.Sprintf("chaosVersion: %s}", d.ChaosVersion) return out } // IsDeployed gathers metadata about an app deployment. func IsDeployed(ctx context.Context, cl *dockerClient.Client, stackName string) (DeployMeta, error) { deployMeta := DeployMeta{ IsDeployed: false, Version: "unknown", IsChaos: false, ChaosVersion: config.CHAOS_DEFAULT, } filter := filters.NewArgs() filter.Add("label", fmt.Sprintf("%s=%s", convert.LabelNamespace, stackName)) services, err := cl.ServiceList(ctx, types.ServiceListOptions{Filters: filter}) if err != nil { return deployMeta, err } if len(services) > 0 { deployMeta.IsDeployed = true for _, service := range services { splitter := fmt.Sprintf("%s_", stackName) serviceName := strings.Split(service.Spec.Name, splitter)[1] if serviceName == "app" { labelKey := fmt.Sprintf("coop-cloud.%s.version", stackName) if deployedVersion, ok := service.Spec.Labels[labelKey]; ok { deployMeta.Version = deployedVersion } labelKey = fmt.Sprintf("coop-cloud.%s.chaos", stackName) if isChaos, ok := service.Spec.Labels[labelKey]; ok { boolVal, err := strconv.ParseBool(isChaos) if err != nil { return deployMeta, fmt.Errorf("unable to parse '%s' value as bool: %s", labelKey, err) } deployMeta.IsChaos = boolVal } labelKey = fmt.Sprintf("coop-cloud.%s.chaos-version", stackName) if chaosVersion, ok := service.Spec.Labels[labelKey]; ok { deployMeta.ChaosVersion = chaosVersion } } } log.Debugf("%s has been detected as deployed: %v", stackName, deployMeta) return deployMeta, nil } log.Debugf("%s has been detected as not deployed", stackName) return deployMeta, nil } // pruneServices removes services that are no longer referenced in the source func pruneServices(ctx context.Context, cl *dockerClient.Client, namespace convert.Namespace, services map[string]struct{}) { oldServices, err := GetStackServices(ctx, cl, namespace.Name()) if err != nil { log.Warnf("failed to list services: %s", err) } pruneServices := []swarm.Service{} for _, service := range oldServices { if _, exists := services[namespace.Descope(service.Spec.Name)]; !exists { pruneServices = append(pruneServices, service) } } removeServices(ctx, cl, pruneServices) } // RunDeploy is the swarm implementation of docker stack deploy func RunDeploy( cl *dockerClient.Client, opts Deploy, cfg *composetypes.Config, appName string, serverName string, dontWait bool, filters filters.Args, ) error { log.Info("initialising deployment") if err := validateResolveImageFlag(&opts); err != nil { return err } // client side image resolution should not be done when the supported // server version is older than 1.30 if versions.LessThan(cl.ClientVersion(), "1.30") { opts.ResolveImage = ResolveImageNever } return deployCompose( context.Background(), cl, opts, cfg, appName, serverName, dontWait, filters, ) } // validateResolveImageFlag validates the opts.resolveImage command line option func validateResolveImageFlag(opts *Deploy) error { switch opts.ResolveImage { case ResolveImageAlways, ResolveImageChanged, ResolveImageNever: return nil default: return errors.Errorf("invalid option %s for flag --resolve-image", opts.ResolveImage) } } func deployCompose( ctx context.Context, cl *dockerClient.Client, opts Deploy, config *composetypes.Config, appName string, serverName string, dontWait bool, filters filters.Args, ) error { namespace := convert.NewNamespace(opts.Namespace) if opts.Prune { services := map[string]struct{}{} for _, service := range config.Services { services[service.Name] = struct{}{} } pruneServices(ctx, cl, namespace, services) } serviceNetworks := getServicesDeclaredNetworks(config.Services) networks, externalNetworks := convert.Networks(namespace, config.Networks, serviceNetworks) if err := validateExternalNetworks(ctx, cl, externalNetworks); err != nil { return err } if err := createNetworks(ctx, cl, namespace, networks); err != nil { return err } secrets, err := convert.Secrets(namespace, config.Secrets) if err != nil { return err } if err := createSecrets(ctx, cl, secrets); err != nil { return err } configs, err := convert.Configs(namespace, config.Configs) if err != nil { return err } if err := createConfigs(ctx, cl, configs); err != nil { return err } services, err := convert.Services(namespace, config, cl) if err != nil { return err } serviceIDs, err := deployServices( ctx, cl, services, namespace, opts.SendRegistryAuth, opts.ResolveImage, ) if err != nil { return err } if dontWait { log.Warn("skipping converge logic checks") return nil } waitOpts := WaitOpts{ Services: serviceIDs, AppName: appName, ServerName: serverName, Filters: filters, } if err := WaitOnServices(ctx, cl, waitOpts); err != nil { return err } return nil } func getServicesDeclaredNetworks(serviceConfigs []composetypes.ServiceConfig) map[string]struct{} { serviceNetworks := map[string]struct{}{} for _, serviceConfig := range serviceConfigs { if len(serviceConfig.Networks) == 0 { serviceNetworks["default"] = struct{}{} continue } for network := range serviceConfig.Networks { serviceNetworks[network] = struct{}{} } } return serviceNetworks } func validateExternalNetworks(ctx context.Context, client dockerClient.NetworkAPIClient, externalNetworks []string) error { for _, networkName := range externalNetworks { if !container.NetworkMode(networkName).IsUserDefined() { // Networks that are not user defined always exist on all nodes as // local-scoped networks, so there's no need to inspect them. continue } network, err := client.NetworkInspect(ctx, networkName, networktypes.InspectOptions{}) switch { case dockerClient.IsErrNotFound(err): return errors.Errorf("network %q is declared as external, but could not be found. You need to create a swarm-scoped network before the stack is deployed, which you can do by running this on the server: docker network create -d overlay proxy", networkName) case err != nil: return err case network.Scope != "swarm": return errors.Errorf("network %q is declared as external, but it is not in the right scope: %q instead of \"swarm\"", networkName, network.Scope) } } return nil } func createSecrets(ctx context.Context, cl *dockerClient.Client, secrets []swarm.SecretSpec) error { for _, secretSpec := range secrets { secret, _, err := cl.SecretInspectWithRaw(ctx, secretSpec.Name) switch { case err == nil: // secret already exists, then we update that if err := cl.SecretUpdate(ctx, secret.ID, secret.Meta.Version, secretSpec); err != nil { return errors.Wrapf(err, "failed to update secret %s", secretSpec.Name) } case dockerClient.IsErrNotFound(err): // secret does not exist, then we create a new one. log.Infof("creating secret %s", secretSpec.Name) if _, err := cl.SecretCreate(ctx, secretSpec); err != nil { return errors.Wrapf(err, "failed to create secret %s", secretSpec.Name) } default: return err } } return nil } func createConfigs(ctx context.Context, cl *dockerClient.Client, configs []swarm.ConfigSpec) error { for _, configSpec := range configs { config, _, err := cl.ConfigInspectWithRaw(ctx, configSpec.Name) switch { case err == nil: // config already exists, then we update that if err := cl.ConfigUpdate(ctx, config.ID, config.Meta.Version, configSpec); err != nil { return errors.Wrapf(err, "failed to update config %s", configSpec.Name) } case dockerClient.IsErrNotFound(err): // config does not exist, then we create a new one. log.Debugf("creating config %s", configSpec.Name) if _, err := cl.ConfigCreate(ctx, configSpec); err != nil { return errors.Wrapf(err, "failed to create config %s", configSpec.Name) } default: return err } } return nil } func createNetworks(ctx context.Context, cl *dockerClient.Client, namespace convert.Namespace, networks map[string]networktypes.CreateOptions) error { existingNetworks, err := getStackNetworks(ctx, cl, namespace.Name()) if err != nil { return err } existingNetworkMap := make(map[string]networktypes.Inspect) for _, network := range existingNetworks { existingNetworkMap[network.Name] = network } for name, createOpts := range networks { if _, exists := existingNetworkMap[name]; exists { continue } if createOpts.Driver == "" { createOpts.Driver = defaultNetworkDriver } log.Debugf("creating network %s", name) if _, err := cl.NetworkCreate(ctx, name, createOpts); err != nil { return errors.Wrapf(err, "failed to create network %s", name) } } return nil } func deployServices( ctx context.Context, cl *dockerClient.Client, services map[string]swarm.ServiceSpec, namespace convert.Namespace, sendAuth bool, resolveImage string) ([]ui.ServiceMeta, error) { var servicesMeta []ui.ServiceMeta existingServices, err := GetStackServices(ctx, cl, namespace.Name()) if err != nil { return servicesMeta, err } existingServiceMap := make(map[string]swarm.Service) for _, service := range existingServices { existingServiceMap[service.Spec.Name] = service } for internalName, serviceSpec := range services { var ( name = namespace.Scope(internalName) image = serviceSpec.TaskTemplate.ContainerSpec.Image encodedAuth string ) if service, exists := existingServiceMap[name]; exists { log.Debugf("updating %s", name) updateOpts := types.ServiceUpdateOptions{EncodedRegistryAuth: encodedAuth} switch resolveImage { case ResolveImageAlways: // image should be updated by the server using QueryRegistry updateOpts.QueryRegistry = true case ResolveImageChanged: if image != service.Spec.Labels[convert.LabelImage] { // Query the registry to resolve digest for the updated image updateOpts.QueryRegistry = true } else { // image has not changed; update the serviceSpec with the // existing information that was set by QueryRegistry on the // previous deploy. Otherwise this will trigger an incorrect // service update. serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image } default: if image == service.Spec.Labels[convert.LabelImage] { // image has not changed; update the serviceSpec with the // existing information that was set by QueryRegistry on the // previous deploy. Otherwise this will trigger an incorrect // service update. serviceSpec.TaskTemplate.ContainerSpec.Image = service.Spec.TaskTemplate.ContainerSpec.Image } } // Stack deploy does not have a `--force` option. Preserve existing // ForceUpdate value so that tasks are not re-deployed if not updated. serviceSpec.TaskTemplate.ForceUpdate = service.Spec.TaskTemplate.ForceUpdate response, err := cl.ServiceUpdate(ctx, service.ID, service.Version, serviceSpec, updateOpts) if err != nil { return nil, errors.Wrapf(err, "failed to update %s", name) } for _, warning := range response.Warnings { log.Warn(warning) } servicesMeta = append(servicesMeta, ui.ServiceMeta{ Name: name, ID: service.ID, }) } else { log.Debugf("creating %s", name) createOpts := types.ServiceCreateOptions{EncodedRegistryAuth: encodedAuth} // query registry if flag disabling it was not set if resolveImage == ResolveImageAlways || resolveImage == ResolveImageChanged { createOpts.QueryRegistry = true } serviceCreateResponse, err := cl.ServiceCreate(ctx, serviceSpec, createOpts) if err != nil { return nil, errors.Wrapf(err, "failed to create %s", name) } servicesMeta = append(servicesMeta, ui.ServiceMeta{ Name: name, ID: serviceCreateResponse.ID, }) } } return servicesMeta, nil } func getStackNetworks(ctx context.Context, dockerclient client.APIClient, namespace string) ([]networktypes.Inspect, error) { return dockerclient.NetworkList(ctx, networktypes.ListOptions{Filters: getStackFilter(namespace)}) } func getStackSecrets(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Secret, error) { return dockerclient.SecretList(ctx, types.SecretListOptions{Filters: getStackFilter(namespace)}) } func getStackConfigs(ctx context.Context, dockerclient client.APIClient, namespace string) ([]swarm.Config, error) { return dockerclient.ConfigList(ctx, types.ConfigListOptions{Filters: getStackFilter(namespace)}) } func timestamp() string { ts := time.Now().UTC().Format(time.RFC3339) return strings.Replace(ts, ":", "", -1) // get rid of offensive colons } type WaitOpts struct { AppName string Filters filters.Args NoLog bool Quiet bool ServerName string Services []ui.ServiceMeta } func WaitOnServices(ctx context.Context, cl *dockerClient.Client, opts WaitOpts) error { timeout := time.Duration(WaitTimeout) * time.Second model := ui.DeployInitialModel(ctx, cl, opts.Services, opts.AppName, timeout, opts.Filters) tui := tea.NewProgram(model) if !opts.Quiet { log.Info("polling deployment status") } m, err := log.Without( func() (tea.Model, error) { return tui.Run() }, ) if err != nil { return fmt.Errorf("waitOnServices: error running TUI: %s", err) } deployModel := m.(ui.Model) if deployModel.TimedOut || deployModel.Failed || deployModel.Quit { var errs []error if deployModel.Failed { errs = append(errs, fmt.Errorf("deploy failed 🛑")) } else if deployModel.TimedOut { errs = append(errs, fmt.Errorf("deploy timed out 🟠")) } else { errs = append(errs, fmt.Errorf("deploy in progress 🟠")) } for _, s := range *deployModel.Streams { if s.Err != nil { errs = append(errs, fmt.Errorf("%s: %s", s.Name, s.Err)) } } if len(*deployModel.Logs) > 0 && !opts.NoLog { logsPath := filepath.Join( config.LOGS_DIR, opts.ServerName, fmt.Sprintf("%s_%s", opts.AppName, timestamp()), ) if err := os.MkdirAll(filepath.Join(config.LOGS_DIR, opts.ServerName), 0764); err != nil { return fmt.Errorf("waitOnServices: error creating log dir: %s", err) } file, err := os.Create(logsPath) if err != nil { return fmt.Errorf("waitOnServices: error opening file: %s", err) } defer file.Close() s := strings.Join(*deployModel.Logs, "\n") if _, err := file.WriteString(s); err != nil { return fmt.Errorf("waitOnServices: writeFile: %s", err) } errs = append(errs, fmt.Errorf("logs: %s", logsPath)) } return stdlibErr.Join(errs...) } if !opts.Quiet { log.Info("deploy succeeded 🟢") } return nil } // Copypasta from https://github.com/docker/cli/blob/master/cli/command/stack/swarm/list.go // GetStacks lists the swarm stacks. func GetStacks(cl *dockerClient.Client) ([]*formatter.Stack, error) { services, err := cl.ServiceList( context.Background(), types.ServiceListOptions{Filters: getAllStacksFilter()}) if err != nil { return nil, err } m := make(map[string]*formatter.Stack) for _, service := range services { labels := service.Spec.Labels name, ok := labels[convert.LabelNamespace] if !ok { return nil, errors.Errorf("cannot get label %s for %s", convert.LabelNamespace, service.ID) } ztack, ok := m[name] if !ok { m[name] = &formatter.Stack{ Name: name, Services: 1, } } else { ztack.Services++ } } var stacks []*formatter.Stack for _, stack := range m { stacks = append(stacks, stack) } return stacks, nil }