forked from toolshed/abra
		
	
		
			
				
	
	
		
			357 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			357 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package ui
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"io"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"coopcloud.tech/abra/pkg/formatter"
 | 
						|
	"coopcloud.tech/abra/pkg/i18n"
 | 
						|
	"coopcloud.tech/abra/pkg/logs"
 | 
						|
	tea "github.com/charmbracelet/bubbletea"
 | 
						|
	"github.com/docker/cli/cli/command/service/progress"
 | 
						|
	containerTypes "github.com/docker/docker/api/types/container"
 | 
						|
	"github.com/docker/docker/api/types/filters"
 | 
						|
	dockerClient "github.com/docker/docker/client"
 | 
						|
	"github.com/docker/docker/pkg/jsonmessage"
 | 
						|
)
 | 
						|
 | 
						|
var IsRunning bool
 | 
						|
 | 
						|
type statusMsg struct {
 | 
						|
	stream  stream
 | 
						|
	jsonMsg jsonmessage.JSONMessage
 | 
						|
}
 | 
						|
 | 
						|
type progressCompleteMsg struct {
 | 
						|
	stream stream
 | 
						|
	failed bool
 | 
						|
}
 | 
						|
 | 
						|
type healthcheckMsg struct {
 | 
						|
	stream stream
 | 
						|
	health string
 | 
						|
}
 | 
						|
 | 
						|
type ServiceMeta struct {
 | 
						|
	Name string
 | 
						|
	ID   string
 | 
						|
}
 | 
						|
 | 
						|
type Model struct {
 | 
						|
	appName string
 | 
						|
	cl      *dockerClient.Client
 | 
						|
	count   int
 | 
						|
	ctx     context.Context
 | 
						|
	timeout time.Duration
 | 
						|
	width   int
 | 
						|
	filters filters.Args
 | 
						|
 | 
						|
	Streams  *[]stream
 | 
						|
	Logs     *[]string
 | 
						|
	Failed   bool
 | 
						|
	TimedOut bool
 | 
						|
	Quit     bool
 | 
						|
}
 | 
						|
 | 
						|
func (m Model) complete() bool {
 | 
						|
	if m.count == len(*m.Streams) {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
type stream struct {
 | 
						|
	Name string
 | 
						|
	Err  error
 | 
						|
 | 
						|
	decoder  *json.Decoder
 | 
						|
	id       string
 | 
						|
	reader   *io.PipeReader
 | 
						|
	writer   *io.PipeWriter
 | 
						|
	status   string
 | 
						|
	retries  int
 | 
						|
	health   string
 | 
						|
	rollback bool
 | 
						|
}
 | 
						|
 | 
						|
func (s stream) String() string {
 | 
						|
	out := i18n.G("{decoder: %v, ", s.decoder)
 | 
						|
	out += i18n.G("err: %v, ", s.Err)
 | 
						|
	out += i18n.G("id: %s, ", s.id)
 | 
						|
	out += i18n.G("name: %s, ", s.Name)
 | 
						|
	out += i18n.G("reader: %v, ", s.reader)
 | 
						|
	out += i18n.G("writer: %v, ", s.writer)
 | 
						|
	out += i18n.G("status: %s}", s.status)
 | 
						|
	return out
 | 
						|
}
 | 
						|
 | 
						|
func (s stream) progress(m Model) tea.Msg {
 | 
						|
	if err := progress.ServiceProgress(m.ctx, m.cl, s.id, s.writer); err != nil {
 | 
						|
		return progressCompleteMsg{
 | 
						|
			stream: s,
 | 
						|
			failed: true,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return progressCompleteMsg{stream: s}
 | 
						|
}
 | 
						|
 | 
						|
func (s stream) process() tea.Msg {
 | 
						|
	var jsonMsg jsonmessage.JSONMessage
 | 
						|
 | 
						|
	if err := s.decoder.Decode(&jsonMsg); err != nil {
 | 
						|
		if err == io.EOF {
 | 
						|
			// NOTE(d1): end processing messages
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return statusMsg{
 | 
						|
		stream:  s,
 | 
						|
		jsonMsg: jsonMsg,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s stream) healthcheck(m Model) tea.Msg {
 | 
						|
	filters := filters.NewArgs()
 | 
						|
	filters.Add("name", i18n.G("^%s", s.Name))
 | 
						|
 | 
						|
	containers, err := m.cl.ContainerList(m.ctx, containerTypes.ListOptions{Filters: filters})
 | 
						|
	if err != nil {
 | 
						|
		s.Err = err
 | 
						|
		return healthcheckMsg{stream: s}
 | 
						|
	}
 | 
						|
 | 
						|
	if len(containers) == 0 {
 | 
						|
		return healthcheckMsg{stream: s}
 | 
						|
	}
 | 
						|
 | 
						|
	container := containers[0]
 | 
						|
	containerState, err := m.cl.ContainerInspect(m.ctx, container.ID)
 | 
						|
	if err != nil {
 | 
						|
		s.Err = err
 | 
						|
		return healthcheckMsg{stream: s}
 | 
						|
	}
 | 
						|
 | 
						|
	var health string
 | 
						|
	if containerState.State.Health != nil {
 | 
						|
		health = containerState.State.Health.Status
 | 
						|
	}
 | 
						|
 | 
						|
	return healthcheckMsg{stream: s, health: health}
 | 
						|
}
 | 
						|
 | 
						|
func DeployInitialModel(
 | 
						|
	ctx context.Context,
 | 
						|
	cl *dockerClient.Client,
 | 
						|
	services []ServiceMeta,
 | 
						|
	appName string,
 | 
						|
	timeout time.Duration,
 | 
						|
	filters filters.Args,
 | 
						|
) Model {
 | 
						|
	var streams []stream
 | 
						|
	for _, service := range services {
 | 
						|
		r, w := io.Pipe()
 | 
						|
		d := json.NewDecoder(r)
 | 
						|
		streams = append(streams, stream{
 | 
						|
			Name:    service.Name,
 | 
						|
			id:      service.ID,
 | 
						|
			reader:  r,
 | 
						|
			writer:  w,
 | 
						|
			decoder: d,
 | 
						|
			retries: -1, // NOTE(d1): skip first attempt
 | 
						|
			health:  "?",
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	sort.Slice(streams, func(i, j int) bool {
 | 
						|
		return streams[i].Name < streams[j].Name
 | 
						|
	})
 | 
						|
 | 
						|
	return Model{
 | 
						|
		ctx:     ctx,
 | 
						|
		cl:      cl,
 | 
						|
		appName: appName,
 | 
						|
		timeout: timeout,
 | 
						|
		filters: filters,
 | 
						|
		Streams: &streams,
 | 
						|
		Logs:    &[]string{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (m Model) Init() tea.Cmd {
 | 
						|
	var cmds []tea.Cmd
 | 
						|
 | 
						|
	for _, stream := range *m.Streams {
 | 
						|
		cmds = append(
 | 
						|
			cmds,
 | 
						|
			[]tea.Cmd{
 | 
						|
				func() tea.Msg { return stream.progress(m) },
 | 
						|
				func() tea.Msg { return stream.process() },
 | 
						|
				func() tea.Msg { return stream.healthcheck(m) },
 | 
						|
			}...,
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	if m.timeout != 0 {
 | 
						|
		cmds = append(cmds, func() tea.Msg { return deployTimeout(m) })
 | 
						|
	}
 | 
						|
 | 
						|
	cmds = append(cmds, func() tea.Msg { return m.gatherLogs() })
 | 
						|
 | 
						|
	return tea.Batch(cmds...)
 | 
						|
}
 | 
						|
 | 
						|
func (m Model) gatherLogs() tea.Msg {
 | 
						|
	var services []string
 | 
						|
	for _, s := range *m.Streams {
 | 
						|
		services = append(services, s.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	opts := logs.TailOpts{
 | 
						|
		AppName:  m.appName,
 | 
						|
		Services: services,
 | 
						|
		StdErr:   true,
 | 
						|
		Buffer:   m.Logs,
 | 
						|
		ToBuffer: true,
 | 
						|
		Filters:  m.filters,
 | 
						|
	}
 | 
						|
 | 
						|
	// NOTE(d1): not interested in log polling errors. if we don't see logs it
 | 
						|
	// will hopefully be self-evident based on what happened in the deployment
 | 
						|
	logs.TailLogs(m.cl, opts)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type timeoutMsg struct{}
 | 
						|
 | 
						|
func deployTimeout(m Model) tea.Msg {
 | 
						|
	<-time.After(m.timeout)
 | 
						|
	return timeoutMsg{}
 | 
						|
}
 | 
						|
 | 
						|
func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
 | 
						|
	var cmds []tea.Cmd
 | 
						|
 | 
						|
	switch msg := msg.(type) {
 | 
						|
	case tea.KeyMsg:
 | 
						|
		switch msg.String() {
 | 
						|
		case "ctrl+c", "q":
 | 
						|
			m.Quit = true
 | 
						|
			return m, tea.Quit
 | 
						|
		}
 | 
						|
 | 
						|
	case tea.WindowSizeMsg:
 | 
						|
		m.width = msg.Width
 | 
						|
 | 
						|
	case progressCompleteMsg:
 | 
						|
		if msg.failed {
 | 
						|
			m.Failed = true
 | 
						|
		}
 | 
						|
 | 
						|
		m.count += 1
 | 
						|
 | 
						|
		if m.complete() {
 | 
						|
			return m, tea.Quit
 | 
						|
		}
 | 
						|
 | 
						|
	case timeoutMsg:
 | 
						|
		m.TimedOut = true
 | 
						|
		return m, tea.Quit
 | 
						|
 | 
						|
	case healthcheckMsg:
 | 
						|
		for idx, s := range *m.Streams {
 | 
						|
			if s.id == msg.stream.id {
 | 
						|
				h := "?"
 | 
						|
				if s.health != "" {
 | 
						|
					h = s.health
 | 
						|
				}
 | 
						|
				if msg.health != "" {
 | 
						|
					h = msg.health
 | 
						|
				}
 | 
						|
				(*m.Streams)[idx].health = h
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		cmds = append(
 | 
						|
			cmds,
 | 
						|
			func() tea.Msg { return msg.stream.healthcheck(m) },
 | 
						|
		)
 | 
						|
 | 
						|
	case statusMsg:
 | 
						|
		for idx, s := range *m.Streams {
 | 
						|
			if s.id == msg.stream.id {
 | 
						|
 | 
						|
				if msg.jsonMsg.ID == "rollback" {
 | 
						|
					m.Failed = true
 | 
						|
					(*m.Streams)[idx].rollback = true
 | 
						|
				}
 | 
						|
 | 
						|
				if msg.jsonMsg.ID != "overall progress" {
 | 
						|
					newStatus := strings.ToLower(msg.jsonMsg.Status)
 | 
						|
					currentStatus := (*m.Streams)[idx].status
 | 
						|
 | 
						|
					if !strings.Contains(currentStatus, "starting") &&
 | 
						|
						strings.Contains(newStatus, "starting") {
 | 
						|
						(*m.Streams)[idx].retries += 1
 | 
						|
					}
 | 
						|
 | 
						|
					if (*m.Streams)[idx].rollback {
 | 
						|
						if msg.jsonMsg.ID == "rollback" {
 | 
						|
							(*m.Streams)[idx].status = newStatus
 | 
						|
						}
 | 
						|
					} else {
 | 
						|
						(*m.Streams)[idx].status = newStatus
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		cmds = append(
 | 
						|
			cmds,
 | 
						|
			func() tea.Msg { return msg.stream.process() },
 | 
						|
		)
 | 
						|
	}
 | 
						|
 | 
						|
	return m, tea.Batch(cmds...)
 | 
						|
}
 | 
						|
 | 
						|
func (m Model) View() string {
 | 
						|
	body := strings.Builder{}
 | 
						|
 | 
						|
	for _, stream := range *m.Streams {
 | 
						|
		split := strings.Split(stream.Name, "_")
 | 
						|
		short := split[len(split)-1]
 | 
						|
 | 
						|
		status := stream.status
 | 
						|
		if strings.Contains(stream.status, "converged") && !stream.rollback {
 | 
						|
			status = i18n.G("succeeded")
 | 
						|
		}
 | 
						|
		if strings.Contains(stream.status, "rolled back") {
 | 
						|
			status = i18n.G("rolled back")
 | 
						|
		}
 | 
						|
 | 
						|
		retries := 0
 | 
						|
		if stream.retries > 0 {
 | 
						|
			retries = stream.retries
 | 
						|
		}
 | 
						|
 | 
						|
		output := i18n.G("%s: %s (retries: %v, healthcheck: %s)",
 | 
						|
			formatter.BoldStyle.Render(short),
 | 
						|
			status,
 | 
						|
			retries,
 | 
						|
			stream.health,
 | 
						|
		)
 | 
						|
 | 
						|
		body.WriteString(output)
 | 
						|
		body.WriteString("\n")
 | 
						|
	}
 | 
						|
 | 
						|
	return body.String()
 | 
						|
}
 |