feat: improved deploy progress reporting

See toolshed/abra#478
This commit is contained in:
2025-03-20 14:23:09 +01:00
committed by decentral1se
parent d0f982456e
commit 47045ca8f1
85 changed files with 8828 additions and 360 deletions

View File

@ -90,6 +90,7 @@ func (a Abra) GetAbraDir() string {
func (a Abra) GetServersDir() string { return path.Join(a.GetAbraDir(), "servers") }
func (a Abra) GetRecipesDir() string { return path.Join(a.GetAbraDir(), "recipes") }
func (a Abra) GetLogsDir() string { return path.Join(a.GetAbraDir(), "logs") }
func (a Abra) GetVendorDir() string { return path.Join(a.GetAbraDir(), "vendor") }
func (a Abra) GetBackupDir() string { return path.Join(a.GetAbraDir(), "backups") }
func (a Abra) GetCatalogueDir() string { return path.Join(a.GetAbraDir(), "catalogue") }
@ -100,6 +101,7 @@ var (
ABRA_DIR = config.GetAbraDir()
SERVERS_DIR = config.GetServersDir()
RECIPES_DIR = config.GetRecipesDir()
LOGS_DIR = config.GetLogsDir()
VENDOR_DIR = config.GetVendorDir()
BACKUP_DIR = config.GetBackupDir()
CATALOGUE_DIR = config.GetCatalogueDir()

View File

@ -2,8 +2,10 @@
package log
import (
"math"
"os"
tea "github.com/charmbracelet/bubbletea"
charmLog "github.com/charmbracelet/log"
)
@ -32,3 +34,13 @@ var SetLevel = Logger.SetLevel
var DebugLevel = charmLog.DebugLevel
var SetOutput = charmLog.SetOutput
var SetReportCaller = charmLog.SetReportCaller
type f func() (tea.Model, error)
func Without(fn f) (tea.Model, error) {
l := Logger.GetLevel()
Logger.SetLevel(math.MaxInt)
m, err := fn()
Logger.SetLevel(l)
return m, err
}

104
pkg/logs/logs.go Normal file
View File

@ -0,0 +1,104 @@
package logs
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/signal"
"sync"
"github.com/docker/docker/api/types"
containerTypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
dockerClient "github.com/docker/docker/client"
)
type TailOpts struct {
AppName string
Services []string
StdErr bool
Since string
Buffer *[]string
ToBuffer bool
Filters filters.Args
}
// TailLogs gathers logs for the given app with optional service names to be
// filtered on. These logs can be printed to os.Stdout or gathered to a buffer.
func TailLogs(
cl *dockerClient.Client,
opts TailOpts,
) error {
sigIntCh := make(chan os.Signal, 1)
signal.Notify(sigIntCh, os.Interrupt)
defer signal.Stop(sigIntCh)
services, err := cl.ServiceList(
context.Background(),
types.ServiceListOptions{Filters: opts.Filters},
)
if err != nil {
return err
}
errCh := make(chan error)
waitCh := make(chan struct{})
go func() {
var wg sync.WaitGroup
for _, service := range services {
wg.Add(1)
go func(serviceID string) {
tail := "50"
if opts.ToBuffer {
// NOTE(d1): more logs from before deployment when analysing via file
tail = "150"
}
logs, err := cl.ServiceLogs(context.Background(), serviceID, containerTypes.LogsOptions{
ShowStderr: true,
ShowStdout: !opts.StdErr,
Since: opts.Since,
Until: "",
Timestamps: true,
Follow: true,
Tail: tail,
Details: false,
})
if err == nil {
defer logs.Close()
if opts.ToBuffer {
buf := bufio.NewScanner(logs)
for buf.Scan() {
line := fmt.Sprintf("%s: %s", service.Spec.Name, buf.Text())
*opts.Buffer = append(*opts.Buffer, line)
}
logs.Close()
return
}
if _, err = io.Copy(os.Stdout, logs); err != nil && err != io.EOF {
errCh <- fmt.Errorf("tailLogs: unable to copy buffer: %s", err)
}
}
}(service.ID)
}
wg.Wait()
close(waitCh)
}()
select {
case <-waitCh:
return nil
case <-sigIntCh:
return nil
case err := <-errCh:
return err
}
return nil
}

353
pkg/ui/deploy.go Normal file
View File

@ -0,0 +1,353 @@
package ui
import (
"context"
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"time"
"coopcloud.tech/abra/pkg/formatter"
"coopcloud.tech/abra/pkg/logs"
tea "github.com/charmbracelet/bubbletea"
"github.com/docker/cli/cli/command/service/progress"
containerTypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
dockerClient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
)
var IsRunning bool
type statusMsg struct {
stream stream
jsonMsg jsonmessage.JSONMessage
}
type progressCompleteMsg struct {
stream stream
failed bool
}
type healthcheckMsg struct {
stream stream
health string
}
type ServiceMeta struct {
Name string
ID string
}
type Model struct {
appName string
cl *dockerClient.Client
count int
ctx context.Context
timeout time.Duration
width int
filters filters.Args
Streams *[]stream
Logs *[]string
Failed bool
TimedOut bool
Quit bool
}
func (m Model) complete() bool {
if m.count == len(*m.Streams) {
return true
}
return false
}
type stream struct {
Name string
Err error
decoder *json.Decoder
id string
reader *io.PipeReader
writer *io.PipeWriter
status string
retries int
health string
rollback bool
}
func (s stream) String() string {
out := fmt.Sprintf("{decoder: %v, ", s.decoder)
out += fmt.Sprintf("err: %v, ", s.Err)
out += fmt.Sprintf("id: %s, ", s.id)
out += fmt.Sprintf("name: %s, ", s.Name)
out += fmt.Sprintf("reader: %v, ", s.reader)
out += fmt.Sprintf("writer: %v, ", s.writer)
out += fmt.Sprintf("status: %s, ", s.status)
return out
}
func (s stream) progress(m Model) tea.Msg {
if err := progress.ServiceProgress(m.ctx, m.cl, s.id, s.writer); err != nil {
return progressCompleteMsg{
stream: s,
failed: true,
}
}
return progressCompleteMsg{stream: s}
}
func (s stream) process() tea.Msg {
var jsonMsg jsonmessage.JSONMessage
if err := s.decoder.Decode(&jsonMsg); err != nil {
if err == io.EOF {
// NOTE(d1): end processing messages
return nil
}
}
return statusMsg{
stream: s,
jsonMsg: jsonMsg,
}
}
func (s stream) healthcheck(m Model) tea.Msg {
filters := filters.NewArgs()
filters.Add("name", fmt.Sprintf("^%s", s.Name))
containers, err := m.cl.ContainerList(m.ctx, containerTypes.ListOptions{Filters: filters})
if err != nil {
s.Err = err
return healthcheckMsg{stream: s}
}
if len(containers) == 0 {
return healthcheckMsg{stream: s}
}
container := containers[0]
containerState, err := m.cl.ContainerInspect(m.ctx, container.ID)
if err != nil {
s.Err = err
return healthcheckMsg{stream: s}
}
var health string
if containerState.State.Health != nil {
health = containerState.State.Health.Status
}
return healthcheckMsg{stream: s, health: health}
}
func DeployInitialModel(
ctx context.Context,
cl *dockerClient.Client,
services []ServiceMeta,
appName string,
timeout time.Duration,
filters filters.Args,
) Model {
var streams []stream
for _, service := range services {
r, w := io.Pipe()
d := json.NewDecoder(r)
streams = append(streams, stream{
Name: service.Name,
id: service.ID,
reader: r,
writer: w,
decoder: d,
retries: -1, // NOTE(d1): skip first attempt
health: "?",
})
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Name < streams[j].Name
})
return Model{
ctx: ctx,
cl: cl,
appName: appName,
timeout: timeout,
filters: filters,
Streams: &streams,
Logs: &[]string{},
}
}
func (m Model) Init() tea.Cmd {
var cmds []tea.Cmd
for _, stream := range *m.Streams {
cmds = append(
cmds,
[]tea.Cmd{
func() tea.Msg { return stream.progress(m) },
func() tea.Msg { return stream.process() },
func() tea.Msg { return stream.healthcheck(m) },
}...,
)
}
cmds = append(cmds, func() tea.Msg { return deployTimeout(m) })
cmds = append(cmds, func() tea.Msg { return m.gatherLogs() })
return tea.Batch(cmds...)
}
func (m Model) gatherLogs() tea.Msg {
var services []string
for _, s := range *m.Streams {
services = append(services, s.Name)
}
opts := logs.TailOpts{
AppName: m.appName,
Services: services,
StdErr: true,
Buffer: m.Logs,
ToBuffer: true,
Filters: m.filters,
}
// NOTE(d1): not interested in log polling errors. if we don't see logs it
// will hopefully be self-evident based on what happened in the deployment
logs.TailLogs(m.cl, opts)
return nil
}
type timeoutMsg struct{}
func deployTimeout(m Model) tea.Msg {
<-time.After(m.timeout)
return timeoutMsg{}
}
func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
var cmds []tea.Cmd
switch msg := msg.(type) {
case tea.KeyMsg:
switch msg.String() {
case "ctrl+c", "q":
m.Quit = true
return m, tea.Quit
}
case tea.WindowSizeMsg:
m.width = msg.Width
case progressCompleteMsg:
if msg.failed {
m.Failed = true
}
m.count += 1
if m.complete() {
return m, tea.Quit
}
case timeoutMsg:
m.TimedOut = true
return m, tea.Quit
case healthcheckMsg:
for idx, s := range *m.Streams {
if s.id == msg.stream.id {
h := "?"
if s.health != "" {
h = s.health
}
if msg.health != "" {
h = msg.health
}
(*m.Streams)[idx].health = h
}
}
cmds = append(
cmds,
func() tea.Msg { return msg.stream.healthcheck(m) },
)
case statusMsg:
for idx, s := range *m.Streams {
if s.id == msg.stream.id {
if msg.jsonMsg.ID == "rollback" {
m.Failed = true
(*m.Streams)[idx].rollback = true
}
if msg.jsonMsg.ID != "overall progress" {
newStatus := strings.ToLower(msg.jsonMsg.Status)
currentStatus := (*m.Streams)[idx].status
if !strings.Contains(currentStatus, "starting") &&
strings.Contains(newStatus, "starting") {
(*m.Streams)[idx].retries += 1
}
if (*m.Streams)[idx].rollback {
if msg.jsonMsg.ID == "rollback" {
(*m.Streams)[idx].status = newStatus
}
} else {
(*m.Streams)[idx].status = newStatus
}
}
}
}
cmds = append(
cmds,
func() tea.Msg { return msg.stream.process() },
)
}
return m, tea.Batch(cmds...)
}
func (m Model) View() string {
body := strings.Builder{}
for _, stream := range *m.Streams {
split := strings.Split(stream.Name, "_")
short := split[len(split)-1]
status := stream.status
if strings.Contains(stream.status, "converged") && !stream.rollback {
status = "succeeded"
}
if strings.Contains(stream.status, "rolled back") {
status = "rolled back"
}
retries := 0
if stream.retries > 0 {
retries = stream.retries
}
output := fmt.Sprintf("%s: %s (retries: %v, healthcheck: %s)",
formatter.BoldStyle.Render(short),
status,
retries,
stream.health,
)
body.WriteString(output)
body.WriteString("\n")
}
return body.String()
}

View File

@ -3,8 +3,11 @@ package stack // https://github.com/docker/cli/blob/master/cli/command/stack/rem
import (
"context"
"fmt"
"os"
"os/signal"
"sort"
"strings"
"time"
"coopcloud.tech/abra/pkg/log"
"github.com/docker/docker/api/types"
@ -18,57 +21,87 @@ import (
// RunRemove is the swarm implementation of docker stack remove
func RunRemove(ctx context.Context, client *apiclient.Client, opts Remove) error {
var errs []string
for _, namespace := range opts.Namespaces {
services, err := GetStackServices(ctx, client, namespace)
if err != nil {
return err
}
sigIntCh := make(chan os.Signal, 1)
signal.Notify(sigIntCh, os.Interrupt)
defer signal.Stop(sigIntCh)
networks, err := getStackNetworks(ctx, client, namespace)
if err != nil {
return err
}
waitCh := make(chan struct{})
errCh := make(chan error)
var secrets []swarm.Secret
if versions.GreaterThanOrEqualTo(client.ClientVersion(), "1.25") {
secrets, err = getStackSecrets(ctx, client, namespace)
go func() {
var errs []string
for _, namespace := range opts.Namespaces {
services, err := GetStackServices(ctx, client, namespace)
if err != nil {
return err
errCh <- err
return
}
networks, err := getStackNetworks(ctx, client, namespace)
if err != nil {
errCh <- err
return
}
var secrets []swarm.Secret
if versions.GreaterThanOrEqualTo(client.ClientVersion(), "1.25") {
secrets, err = getStackSecrets(ctx, client, namespace)
if err != nil {
errCh <- err
return
}
}
var configs []swarm.Config
if versions.GreaterThanOrEqualTo(client.ClientVersion(), "1.30") {
configs, err = getStackConfigs(ctx, client, namespace)
if err != nil {
errCh <- err
return
}
}
if len(services)+len(networks)+len(secrets)+len(configs) == 0 {
log.Warnf("nothing found in stack: %s", namespace)
continue
}
hasError := removeServices(ctx, client, services)
hasError = removeSecrets(ctx, client, secrets) || hasError
hasError = removeConfigs(ctx, client, configs) || hasError
hasError = removeNetworks(ctx, client, networks) || hasError
if hasError {
errs = append(errs, fmt.Sprintf("failed to remove some resources from stack: %s", namespace))
continue
}
log.Info("polling undeploy status")
timeout, err := waitOnTasks(ctx, client, namespace)
if timeout {
errs = append(errs, err.Error())
} else {
if err != nil {
errs = append(errs, fmt.Sprintf("failed to wait on tasks of stack: %s: %s", namespace, err))
}
}
}
var configs []swarm.Config
if versions.GreaterThanOrEqualTo(client.ClientVersion(), "1.30") {
configs, err = getStackConfigs(ctx, client, namespace)
if err != nil {
return err
}
if len(errs) > 0 {
errCh <- errors.Errorf(strings.Join(errs, "\n"))
return
}
if len(services)+len(networks)+len(secrets)+len(configs) == 0 {
log.Warnf("nothing found in stack: %s", namespace)
continue
}
close(waitCh)
}()
hasError := removeServices(ctx, client, services)
hasError = removeSecrets(ctx, client, secrets) || hasError
hasError = removeConfigs(ctx, client, configs) || hasError
hasError = removeNetworks(ctx, client, networks) || hasError
if hasError {
errs = append(errs, fmt.Sprintf("failed to remove some resources from stack: %s", namespace))
continue
}
err = waitOnTasks(ctx, client, namespace)
if err != nil {
errs = append(errs, fmt.Sprintf("failed to wait on tasks of stack: %s: %s", namespace, err))
}
}
if len(errs) > 0 {
return errors.Errorf(strings.Join(errs, "\n"))
select {
case <-waitCh:
return nil
case <-sigIntCh:
return fmt.Errorf("skipping as requested, undeploy still in progress 🟠")
case err := <-errCh:
return err
}
return nil
@ -88,7 +121,7 @@ func removeServices(
var hasError bool
sort.Slice(services, sortServiceByName(services))
for _, service := range services {
log.Infof("removing service %s", service.Spec.Name)
log.Debugf("removing service %s", service.Spec.Name)
if err := client.ServiceRemove(ctx, service.ID); err != nil {
hasError = true
log.Fatalf("failed to remove service %s: %s", service.ID, err)
@ -104,7 +137,7 @@ func removeNetworks(
) bool {
var hasError bool
for _, network := range networks {
log.Infof("removing network %s", network.Name)
log.Debugf("removing network %s", network.Name)
if err := client.NetworkRemove(ctx, network.ID); err != nil {
hasError = true
log.Fatalf("failed to remove network %s: %s", network.ID, err)
@ -120,7 +153,7 @@ func removeSecrets(
) bool {
var hasError bool
for _, secret := range secrets {
log.Infof("removing secret %s", secret.Spec.Name)
log.Debugf("removing secret %s", secret.Spec.Name)
if err := client.SecretRemove(ctx, secret.ID); err != nil {
hasError = true
log.Fatalf("Failed to remove secret %s: %s", secret.ID, err)
@ -136,7 +169,7 @@ func removeConfigs(
) bool {
var hasError bool
for _, config := range configs {
log.Infof("removing config %s", config.Spec.Name)
log.Debugf("removing config %s", config.Spec.Name)
if err := client.ConfigRemove(ctx, config.ID); err != nil {
hasError = true
log.Fatalf("failed to remove config %s: %s", config.ID, err)
@ -170,12 +203,23 @@ func terminalState(state swarm.TaskState) bool {
return numberedStates[state] > numberedStates[swarm.TaskStateRunning]
}
func waitOnTasks(ctx context.Context, client apiclient.APIClient, namespace string) error {
func waitOnTasks(ctx context.Context, client apiclient.APIClient, namespace string) (bool, error) {
var timedOut bool
log.Debugf("waiting on undeploy tasks (timeout=%v secs)", WaitTimeout)
go func() {
t := time.Duration(WaitTimeout) * time.Second
<-time.After(t)
log.Debug("timed out on undeploy")
timedOut = true
}()
terminalStatesReached := 0
for {
tasks, err := getStackTasks(ctx, client, namespace)
if err != nil {
return fmt.Errorf("failed to get tasks: %w", err)
return false, fmt.Errorf("failed to get tasks: %w", err)
}
for _, task := range tasks {
@ -188,6 +232,11 @@ func waitOnTasks(ctx context.Context, client apiclient.APIClient, namespace stri
if terminalStatesReached == len(tasks) {
break
}
if timedOut {
return true, fmt.Errorf("deployment timed out 🟠")
}
}
return nil
return false, nil
}

View File

@ -3,20 +3,20 @@ package stack // https://github.com/docker/cli/blob/master/cli/command/stack/swa
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"time"
stdlibErr "errors"
tea "github.com/charmbracelet/bubbletea"
"coopcloud.tech/abra/pkg/config"
"coopcloud.tech/abra/pkg/log"
"coopcloud.tech/abra/pkg/ui"
"coopcloud.tech/abra/pkg/upstream/convert"
"github.com/docker/cli/cli/command/service/progress"
"github.com/docker/cli/cli/command/stack/formatter"
composetypes "github.com/docker/cli/cli/compose/types"
"github.com/docker/docker/api/types"
@ -177,7 +177,7 @@ func IsDeployed(ctx context.Context, cl *dockerClient.Client, stackName string)
func pruneServices(ctx context.Context, cl *dockerClient.Client, namespace convert.Namespace, services map[string]struct{}) {
oldServices, err := GetStackServices(ctx, cl, namespace.Name())
if err != nil {
log.Infof("failed to list services: %s", err)
log.Warnf("failed to list services: %s", err)
}
pruneServices := []swarm.Service{}
@ -191,7 +191,17 @@ func pruneServices(ctx context.Context, cl *dockerClient.Client, namespace conve
}
// RunDeploy is the swarm implementation of docker stack deploy
func RunDeploy(cl *dockerClient.Client, opts Deploy, cfg *composetypes.Config, appName string, dontWait bool) error {
func RunDeploy(
cl *dockerClient.Client,
opts Deploy,
cfg *composetypes.Config,
appName string,
serverName string,
dontWait bool,
filters filters.Args,
) error {
log.Info("initialising deployment")
if err := validateResolveImageFlag(&opts); err != nil {
return err
}
@ -201,7 +211,16 @@ func RunDeploy(cl *dockerClient.Client, opts Deploy, cfg *composetypes.Config, a
opts.ResolveImage = ResolveImageNever
}
return deployCompose(context.Background(), cl, opts, cfg, appName, dontWait)
return deployCompose(
context.Background(),
cl,
opts,
cfg,
appName,
serverName,
dontWait,
filters,
)
}
// validateResolveImageFlag validates the opts.resolveImage command line option
@ -214,7 +233,16 @@ func validateResolveImageFlag(opts *Deploy) error {
}
}
func deployCompose(ctx context.Context, cl *dockerClient.Client, opts Deploy, config *composetypes.Config, appName string, dontWait bool) error {
func deployCompose(
ctx context.Context,
cl *dockerClient.Client,
opts Deploy,
config *composetypes.Config,
appName string,
serverName string,
dontWait bool,
filters filters.Args,
) error {
namespace := convert.NewNamespace(opts.Namespace)
if opts.Prune {
@ -255,7 +283,14 @@ func deployCompose(ctx context.Context, cl *dockerClient.Client, opts Deploy, co
return err
}
serviceIDs, err := deployServices(ctx, cl, services, namespace, opts.SendRegistryAuth, opts.ResolveImage)
serviceIDs, err := deployServices(
ctx,
cl,
services,
namespace,
opts.SendRegistryAuth,
opts.ResolveImage,
)
if err != nil {
return err
}
@ -265,13 +300,16 @@ func deployCompose(ctx context.Context, cl *dockerClient.Client, opts Deploy, co
return nil
}
log.Infof("waiting for %s to deploy... please hold 🤚", appName)
if err := waitOnServices(ctx, cl, serviceIDs, appName); err != nil {
return err
waitOpts := WaitOpts{
Services: serviceIDs,
AppName: appName,
ServerName: serverName,
Filters: filters,
}
log.Infof("successfully deployed %s", appName)
if err := WaitOnServices(ctx, cl, waitOpts); err != nil {
return err
}
return nil
}
@ -343,7 +381,7 @@ func createConfigs(ctx context.Context, cl *dockerClient.Client, configs []swarm
}
case dockerClient.IsErrNotFound(err):
// config does not exist, then we create a new one.
log.Infof("creating config %s", configSpec.Name)
log.Debugf("creating config %s", configSpec.Name)
if _, err := cl.ConfigCreate(ctx, configSpec); err != nil {
return errors.Wrapf(err, "failed to create config %s", configSpec.Name)
}
@ -374,7 +412,7 @@ func createNetworks(ctx context.Context, cl *dockerClient.Client, namespace conv
createOpts.Driver = defaultNetworkDriver
}
log.Infof("creating network %s", name)
log.Debugf("creating network %s", name)
if _, err := cl.NetworkCreate(ctx, name, createOpts); err != nil {
return errors.Wrapf(err, "failed to create network %s", name)
}
@ -388,10 +426,12 @@ func deployServices(
services map[string]swarm.ServiceSpec,
namespace convert.Namespace,
sendAuth bool,
resolveImage string) ([]string, error) {
resolveImage string) ([]ui.ServiceMeta, error) {
var servicesMeta []ui.ServiceMeta
existingServices, err := GetStackServices(ctx, cl, namespace.Name())
if err != nil {
return nil, err
return servicesMeta, err
}
existingServiceMap := make(map[string]swarm.Service)
@ -399,8 +439,6 @@ func deployServices(
existingServiceMap[service.Spec.Name] = service
}
var serviceIDs []string
for internalName, serviceSpec := range services {
var (
name = namespace.Scope(internalName)
@ -409,7 +447,7 @@ func deployServices(
)
if service, exists := existingServiceMap[name]; exists {
log.Infof("updating %s", name)
log.Debugf("updating %s", name)
updateOpts := types.ServiceUpdateOptions{EncodedRegistryAuth: encodedAuth}
@ -451,9 +489,12 @@ func deployServices(
log.Warn(warning)
}
serviceIDs = append(serviceIDs, service.ID)
servicesMeta = append(servicesMeta, ui.ServiceMeta{
Name: name,
ID: service.ID,
})
} else {
log.Infof("creating %s", name)
log.Debugf("creating %s", name)
createOpts := types.ServiceCreateOptions{EncodedRegistryAuth: encodedAuth}
@ -467,11 +508,14 @@ func deployServices(
return nil, errors.Wrapf(err, "failed to create %s", name)
}
serviceIDs = append(serviceIDs, serviceCreateResponse.ID)
servicesMeta = append(servicesMeta, ui.ServiceMeta{
Name: name,
ID: serviceCreateResponse.ID,
})
}
}
return serviceIDs, nil
return servicesMeta, nil
}
func getStackNetworks(ctx context.Context, dockerclient client.APIClient, namespace string) ([]networktypes.Inspect, error) {
@ -486,67 +530,89 @@ func getStackConfigs(ctx context.Context, dockerclient client.APIClient, namespa
return dockerclient.ConfigList(ctx, types.ConfigListOptions{Filters: getStackFilter(namespace)})
}
func waitOnServices(ctx context.Context, cl *dockerClient.Client, serviceIDs []string, appName string) error {
var errs []error
func timestamp() string {
ts := time.Now().UTC().Format(time.RFC3339)
return strings.Replace(ts, ":", "", -1) // get rid of offensive colons
}
for _, serviceID := range serviceIDs {
if err := WaitOnService(ctx, cl, serviceID, appName); err != nil {
errs = append(errs, fmt.Errorf("%s: %w", serviceID, err))
}
type WaitOpts struct {
AppName string
Filters filters.Args
NoLog bool
Quiet bool
ServerName string
Services []ui.ServiceMeta
}
func WaitOnServices(ctx context.Context, cl *dockerClient.Client, opts WaitOpts) error {
timeout := time.Duration(WaitTimeout) * time.Second
model := ui.DeployInitialModel(ctx, cl, opts.Services, opts.AppName, timeout, opts.Filters)
tui := tea.NewProgram(model)
if !opts.Quiet {
log.Info("polling deployment status")
}
if len(errs) > 0 {
m, err := log.Without(
func() (tea.Model, error) {
return tui.Run()
},
)
if err != nil {
return fmt.Errorf("waitOnServices: error running TUI: %s", err)
}
deployModel := m.(ui.Model)
if deployModel.TimedOut || deployModel.Failed || deployModel.Quit {
var errs []error
if deployModel.Failed {
errs = append(errs, fmt.Errorf("deploy failed 🛑"))
} else if deployModel.TimedOut {
errs = append(errs, fmt.Errorf("deploy timed out 🟠"))
} else {
errs = append(errs, fmt.Errorf("deploy in progress 🟠"))
}
for _, s := range *deployModel.Streams {
if s.Err != nil {
errs = append(errs, fmt.Errorf("%s: %s", s.Name, s.Err))
}
}
if len(*deployModel.Logs) > 0 && !opts.NoLog {
logsPath := filepath.Join(
config.LOGS_DIR,
opts.ServerName,
fmt.Sprintf("%s_%s", opts.AppName, timestamp()),
)
if err := os.MkdirAll(filepath.Join(config.LOGS_DIR, opts.ServerName), 0764); err != nil {
return fmt.Errorf("waitOnServices: error creating log dir: %s", err)
}
file, err := os.Create(logsPath)
if err != nil {
return fmt.Errorf("waitOnServices: error opening file: %s", err)
}
defer file.Close()
s := strings.Join(*deployModel.Logs, "\n")
if _, err := file.WriteString(s); err != nil {
return fmt.Errorf("waitOnServices: writeFile: %s", err)
}
errs = append(errs, fmt.Errorf("logs: %s", logsPath))
}
return stdlibErr.Join(errs...)
}
return nil
}
// https://github.com/docker/cli/blob/master/cli/command/service/helpers.go
// https://github.com/docker/cli/blob/master/cli/command/service/progress/progress.go
func WaitOnService(ctx context.Context, cl *dockerClient.Client, serviceID, appName string) error {
errChan := make(chan error, 1)
pipeReader, pipeWriter := io.Pipe()
sigintChannel := make(chan os.Signal, 1)
signal.Notify(sigintChannel, os.Interrupt)
defer signal.Stop(sigintChannel)
go func() {
errChan <- progress.ServiceProgress(ctx, cl, serviceID, pipeWriter)
}()
go io.Copy(ioutil.Discard, pipeReader)
timeout := time.Duration(WaitTimeout) * time.Second
select {
case err := <-errChan:
return err
case <-sigintChannel:
return fmt.Errorf(`
Not waiting for %s to deploy. The deployment is ongoing...
If you want to stop the deployment, try:
abra app undeploy %s`, appName, appName)
case <-time.After(timeout):
return fmt.Errorf(`
%s has not converged (%s second timeout reached).
This does not necessarily mean your deployment has failed, it may just be that
the app is taking longer to deploy based on your server resources or network
latency.
You can track latest deployment status with:
abra app ps %s
And inspect the logs with:
abra app logs %s
`, appName, timeout, appName, appName)
if !opts.Quiet {
log.Info("deploy succeeded 🟢")
}
return nil
}
// Copypasta from https://github.com/docker/cli/blob/master/cli/command/stack/swarm/list.go