abra/pkg/ui/deploy.go
decentral1se 47045ca8f1
All checks were successful
continuous-integration/drone/push Build is passing
feat: improved deploy progress reporting
See #478
2025-03-23 09:13:36 +00:00

354 lines
6.7 KiB
Go

package ui
import (
"context"
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"time"
"coopcloud.tech/abra/pkg/formatter"
"coopcloud.tech/abra/pkg/logs"
tea "github.com/charmbracelet/bubbletea"
"github.com/docker/cli/cli/command/service/progress"
containerTypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
dockerClient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
)
var IsRunning bool
type statusMsg struct {
stream stream
jsonMsg jsonmessage.JSONMessage
}
type progressCompleteMsg struct {
stream stream
failed bool
}
type healthcheckMsg struct {
stream stream
health string
}
type ServiceMeta struct {
Name string
ID string
}
type Model struct {
appName string
cl *dockerClient.Client
count int
ctx context.Context
timeout time.Duration
width int
filters filters.Args
Streams *[]stream
Logs *[]string
Failed bool
TimedOut bool
Quit bool
}
func (m Model) complete() bool {
if m.count == len(*m.Streams) {
return true
}
return false
}
type stream struct {
Name string
Err error
decoder *json.Decoder
id string
reader *io.PipeReader
writer *io.PipeWriter
status string
retries int
health string
rollback bool
}
func (s stream) String() string {
out := fmt.Sprintf("{decoder: %v, ", s.decoder)
out += fmt.Sprintf("err: %v, ", s.Err)
out += fmt.Sprintf("id: %s, ", s.id)
out += fmt.Sprintf("name: %s, ", s.Name)
out += fmt.Sprintf("reader: %v, ", s.reader)
out += fmt.Sprintf("writer: %v, ", s.writer)
out += fmt.Sprintf("status: %s, ", s.status)
return out
}
func (s stream) progress(m Model) tea.Msg {
if err := progress.ServiceProgress(m.ctx, m.cl, s.id, s.writer); err != nil {
return progressCompleteMsg{
stream: s,
failed: true,
}
}
return progressCompleteMsg{stream: s}
}
func (s stream) process() tea.Msg {
var jsonMsg jsonmessage.JSONMessage
if err := s.decoder.Decode(&jsonMsg); err != nil {
if err == io.EOF {
// NOTE(d1): end processing messages
return nil
}
}
return statusMsg{
stream: s,
jsonMsg: jsonMsg,
}
}
func (s stream) healthcheck(m Model) tea.Msg {
filters := filters.NewArgs()
filters.Add("name", fmt.Sprintf("^%s", s.Name))
containers, err := m.cl.ContainerList(m.ctx, containerTypes.ListOptions{Filters: filters})
if err != nil {
s.Err = err
return healthcheckMsg{stream: s}
}
if len(containers) == 0 {
return healthcheckMsg{stream: s}
}
container := containers[0]
containerState, err := m.cl.ContainerInspect(m.ctx, container.ID)
if err != nil {
s.Err = err
return healthcheckMsg{stream: s}
}
var health string
if containerState.State.Health != nil {
health = containerState.State.Health.Status
}
return healthcheckMsg{stream: s, health: health}
}
func DeployInitialModel(
ctx context.Context,
cl *dockerClient.Client,
services []ServiceMeta,
appName string,
timeout time.Duration,
filters filters.Args,
) Model {
var streams []stream
for _, service := range services {
r, w := io.Pipe()
d := json.NewDecoder(r)
streams = append(streams, stream{
Name: service.Name,
id: service.ID,
reader: r,
writer: w,
decoder: d,
retries: -1, // NOTE(d1): skip first attempt
health: "?",
})
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Name < streams[j].Name
})
return Model{
ctx: ctx,
cl: cl,
appName: appName,
timeout: timeout,
filters: filters,
Streams: &streams,
Logs: &[]string{},
}
}
func (m Model) Init() tea.Cmd {
var cmds []tea.Cmd
for _, stream := range *m.Streams {
cmds = append(
cmds,
[]tea.Cmd{
func() tea.Msg { return stream.progress(m) },
func() tea.Msg { return stream.process() },
func() tea.Msg { return stream.healthcheck(m) },
}...,
)
}
cmds = append(cmds, func() tea.Msg { return deployTimeout(m) })
cmds = append(cmds, func() tea.Msg { return m.gatherLogs() })
return tea.Batch(cmds...)
}
func (m Model) gatherLogs() tea.Msg {
var services []string
for _, s := range *m.Streams {
services = append(services, s.Name)
}
opts := logs.TailOpts{
AppName: m.appName,
Services: services,
StdErr: true,
Buffer: m.Logs,
ToBuffer: true,
Filters: m.filters,
}
// NOTE(d1): not interested in log polling errors. if we don't see logs it
// will hopefully be self-evident based on what happened in the deployment
logs.TailLogs(m.cl, opts)
return nil
}
type timeoutMsg struct{}
func deployTimeout(m Model) tea.Msg {
<-time.After(m.timeout)
return timeoutMsg{}
}
func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
var cmds []tea.Cmd
switch msg := msg.(type) {
case tea.KeyMsg:
switch msg.String() {
case "ctrl+c", "q":
m.Quit = true
return m, tea.Quit
}
case tea.WindowSizeMsg:
m.width = msg.Width
case progressCompleteMsg:
if msg.failed {
m.Failed = true
}
m.count += 1
if m.complete() {
return m, tea.Quit
}
case timeoutMsg:
m.TimedOut = true
return m, tea.Quit
case healthcheckMsg:
for idx, s := range *m.Streams {
if s.id == msg.stream.id {
h := "?"
if s.health != "" {
h = s.health
}
if msg.health != "" {
h = msg.health
}
(*m.Streams)[idx].health = h
}
}
cmds = append(
cmds,
func() tea.Msg { return msg.stream.healthcheck(m) },
)
case statusMsg:
for idx, s := range *m.Streams {
if s.id == msg.stream.id {
if msg.jsonMsg.ID == "rollback" {
m.Failed = true
(*m.Streams)[idx].rollback = true
}
if msg.jsonMsg.ID != "overall progress" {
newStatus := strings.ToLower(msg.jsonMsg.Status)
currentStatus := (*m.Streams)[idx].status
if !strings.Contains(currentStatus, "starting") &&
strings.Contains(newStatus, "starting") {
(*m.Streams)[idx].retries += 1
}
if (*m.Streams)[idx].rollback {
if msg.jsonMsg.ID == "rollback" {
(*m.Streams)[idx].status = newStatus
}
} else {
(*m.Streams)[idx].status = newStatus
}
}
}
}
cmds = append(
cmds,
func() tea.Msg { return msg.stream.process() },
)
}
return m, tea.Batch(cmds...)
}
func (m Model) View() string {
body := strings.Builder{}
for _, stream := range *m.Streams {
split := strings.Split(stream.Name, "_")
short := split[len(split)-1]
status := stream.status
if strings.Contains(stream.status, "converged") && !stream.rollback {
status = "succeeded"
}
if strings.Contains(stream.status, "rolled back") {
status = "rolled back"
}
retries := 0
if stream.retries > 0 {
retries = stream.retries
}
output := fmt.Sprintf("%s: %s (retries: %v, healthcheck: %s)",
formatter.BoldStyle.Render(short),
status,
retries,
stream.health,
)
body.WriteString(output)
body.WriteString("\n")
}
return body.String()
}