forked from toolshed/abra
		
	
		
			
				
	
	
		
			354 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			354 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package ui
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"coopcloud.tech/abra/pkg/formatter"
 | |
| 	"coopcloud.tech/abra/pkg/logs"
 | |
| 	tea "github.com/charmbracelet/bubbletea"
 | |
| 	"github.com/docker/cli/cli/command/service/progress"
 | |
| 	containerTypes "github.com/docker/docker/api/types/container"
 | |
| 	"github.com/docker/docker/api/types/filters"
 | |
| 	dockerClient "github.com/docker/docker/client"
 | |
| 	"github.com/docker/docker/pkg/jsonmessage"
 | |
| )
 | |
| 
 | |
| var IsRunning bool
 | |
| 
 | |
| type statusMsg struct {
 | |
| 	stream  stream
 | |
| 	jsonMsg jsonmessage.JSONMessage
 | |
| }
 | |
| 
 | |
| type progressCompleteMsg struct {
 | |
| 	stream stream
 | |
| 	failed bool
 | |
| }
 | |
| 
 | |
| type healthcheckMsg struct {
 | |
| 	stream stream
 | |
| 	health string
 | |
| }
 | |
| 
 | |
| type ServiceMeta struct {
 | |
| 	Name string
 | |
| 	ID   string
 | |
| }
 | |
| 
 | |
| type Model struct {
 | |
| 	appName string
 | |
| 	cl      *dockerClient.Client
 | |
| 	count   int
 | |
| 	ctx     context.Context
 | |
| 	timeout time.Duration
 | |
| 	width   int
 | |
| 	filters filters.Args
 | |
| 
 | |
| 	Streams  *[]stream
 | |
| 	Logs     *[]string
 | |
| 	Failed   bool
 | |
| 	TimedOut bool
 | |
| 	Quit     bool
 | |
| }
 | |
| 
 | |
| func (m Model) complete() bool {
 | |
| 	if m.count == len(*m.Streams) {
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| type stream struct {
 | |
| 	Name string
 | |
| 	Err  error
 | |
| 
 | |
| 	decoder  *json.Decoder
 | |
| 	id       string
 | |
| 	reader   *io.PipeReader
 | |
| 	writer   *io.PipeWriter
 | |
| 	status   string
 | |
| 	retries  int
 | |
| 	health   string
 | |
| 	rollback bool
 | |
| }
 | |
| 
 | |
| func (s stream) String() string {
 | |
| 	out := fmt.Sprintf("{decoder: %v, ", s.decoder)
 | |
| 	out += fmt.Sprintf("err: %v, ", s.Err)
 | |
| 	out += fmt.Sprintf("id: %s, ", s.id)
 | |
| 	out += fmt.Sprintf("name: %s, ", s.Name)
 | |
| 	out += fmt.Sprintf("reader: %v, ", s.reader)
 | |
| 	out += fmt.Sprintf("writer: %v, ", s.writer)
 | |
| 	out += fmt.Sprintf("status: %s, ", s.status)
 | |
| 	return out
 | |
| }
 | |
| 
 | |
| func (s stream) progress(m Model) tea.Msg {
 | |
| 	if err := progress.ServiceProgress(m.ctx, m.cl, s.id, s.writer); err != nil {
 | |
| 		return progressCompleteMsg{
 | |
| 			stream: s,
 | |
| 			failed: true,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return progressCompleteMsg{stream: s}
 | |
| }
 | |
| 
 | |
| func (s stream) process() tea.Msg {
 | |
| 	var jsonMsg jsonmessage.JSONMessage
 | |
| 
 | |
| 	if err := s.decoder.Decode(&jsonMsg); err != nil {
 | |
| 		if err == io.EOF {
 | |
| 			// NOTE(d1): end processing messages
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return statusMsg{
 | |
| 		stream:  s,
 | |
| 		jsonMsg: jsonMsg,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s stream) healthcheck(m Model) tea.Msg {
 | |
| 	filters := filters.NewArgs()
 | |
| 	filters.Add("name", fmt.Sprintf("^%s", s.Name))
 | |
| 
 | |
| 	containers, err := m.cl.ContainerList(m.ctx, containerTypes.ListOptions{Filters: filters})
 | |
| 	if err != nil {
 | |
| 		s.Err = err
 | |
| 		return healthcheckMsg{stream: s}
 | |
| 	}
 | |
| 
 | |
| 	if len(containers) == 0 {
 | |
| 		return healthcheckMsg{stream: s}
 | |
| 	}
 | |
| 
 | |
| 	container := containers[0]
 | |
| 	containerState, err := m.cl.ContainerInspect(m.ctx, container.ID)
 | |
| 	if err != nil {
 | |
| 		s.Err = err
 | |
| 		return healthcheckMsg{stream: s}
 | |
| 	}
 | |
| 
 | |
| 	var health string
 | |
| 	if containerState.State.Health != nil {
 | |
| 		health = containerState.State.Health.Status
 | |
| 	}
 | |
| 
 | |
| 	return healthcheckMsg{stream: s, health: health}
 | |
| }
 | |
| 
 | |
| func DeployInitialModel(
 | |
| 	ctx context.Context,
 | |
| 	cl *dockerClient.Client,
 | |
| 	services []ServiceMeta,
 | |
| 	appName string,
 | |
| 	timeout time.Duration,
 | |
| 	filters filters.Args,
 | |
| ) Model {
 | |
| 	var streams []stream
 | |
| 	for _, service := range services {
 | |
| 		r, w := io.Pipe()
 | |
| 		d := json.NewDecoder(r)
 | |
| 		streams = append(streams, stream{
 | |
| 			Name:    service.Name,
 | |
| 			id:      service.ID,
 | |
| 			reader:  r,
 | |
| 			writer:  w,
 | |
| 			decoder: d,
 | |
| 			retries: -1, // NOTE(d1): skip first attempt
 | |
| 			health:  "?",
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	sort.Slice(streams, func(i, j int) bool {
 | |
| 		return streams[i].Name < streams[j].Name
 | |
| 	})
 | |
| 
 | |
| 	return Model{
 | |
| 		ctx:     ctx,
 | |
| 		cl:      cl,
 | |
| 		appName: appName,
 | |
| 		timeout: timeout,
 | |
| 		filters: filters,
 | |
| 		Streams: &streams,
 | |
| 		Logs:    &[]string{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m Model) Init() tea.Cmd {
 | |
| 	var cmds []tea.Cmd
 | |
| 
 | |
| 	for _, stream := range *m.Streams {
 | |
| 		cmds = append(
 | |
| 			cmds,
 | |
| 			[]tea.Cmd{
 | |
| 				func() tea.Msg { return stream.progress(m) },
 | |
| 				func() tea.Msg { return stream.process() },
 | |
| 				func() tea.Msg { return stream.healthcheck(m) },
 | |
| 			}...,
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	cmds = append(cmds, func() tea.Msg { return deployTimeout(m) })
 | |
| 	cmds = append(cmds, func() tea.Msg { return m.gatherLogs() })
 | |
| 
 | |
| 	return tea.Batch(cmds...)
 | |
| }
 | |
| 
 | |
| func (m Model) gatherLogs() tea.Msg {
 | |
| 	var services []string
 | |
| 	for _, s := range *m.Streams {
 | |
| 		services = append(services, s.Name)
 | |
| 	}
 | |
| 
 | |
| 	opts := logs.TailOpts{
 | |
| 		AppName:  m.appName,
 | |
| 		Services: services,
 | |
| 		StdErr:   true,
 | |
| 		Buffer:   m.Logs,
 | |
| 		ToBuffer: true,
 | |
| 		Filters:  m.filters,
 | |
| 	}
 | |
| 
 | |
| 	// NOTE(d1): not interested in log polling errors. if we don't see logs it
 | |
| 	// will hopefully be self-evident based on what happened in the deployment
 | |
| 	logs.TailLogs(m.cl, opts)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type timeoutMsg struct{}
 | |
| 
 | |
| func deployTimeout(m Model) tea.Msg {
 | |
| 	<-time.After(m.timeout)
 | |
| 	return timeoutMsg{}
 | |
| }
 | |
| 
 | |
| func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
 | |
| 	var cmds []tea.Cmd
 | |
| 
 | |
| 	switch msg := msg.(type) {
 | |
| 	case tea.KeyMsg:
 | |
| 		switch msg.String() {
 | |
| 		case "ctrl+c", "q":
 | |
| 			m.Quit = true
 | |
| 			return m, tea.Quit
 | |
| 		}
 | |
| 
 | |
| 	case tea.WindowSizeMsg:
 | |
| 		m.width = msg.Width
 | |
| 
 | |
| 	case progressCompleteMsg:
 | |
| 		if msg.failed {
 | |
| 			m.Failed = true
 | |
| 		}
 | |
| 
 | |
| 		m.count += 1
 | |
| 
 | |
| 		if m.complete() {
 | |
| 			return m, tea.Quit
 | |
| 		}
 | |
| 
 | |
| 	case timeoutMsg:
 | |
| 		m.TimedOut = true
 | |
| 		return m, tea.Quit
 | |
| 
 | |
| 	case healthcheckMsg:
 | |
| 		for idx, s := range *m.Streams {
 | |
| 			if s.id == msg.stream.id {
 | |
| 				h := "?"
 | |
| 				if s.health != "" {
 | |
| 					h = s.health
 | |
| 				}
 | |
| 				if msg.health != "" {
 | |
| 					h = msg.health
 | |
| 				}
 | |
| 				(*m.Streams)[idx].health = h
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		cmds = append(
 | |
| 			cmds,
 | |
| 			func() tea.Msg { return msg.stream.healthcheck(m) },
 | |
| 		)
 | |
| 
 | |
| 	case statusMsg:
 | |
| 		for idx, s := range *m.Streams {
 | |
| 			if s.id == msg.stream.id {
 | |
| 
 | |
| 				if msg.jsonMsg.ID == "rollback" {
 | |
| 					m.Failed = true
 | |
| 					(*m.Streams)[idx].rollback = true
 | |
| 				}
 | |
| 
 | |
| 				if msg.jsonMsg.ID != "overall progress" {
 | |
| 					newStatus := strings.ToLower(msg.jsonMsg.Status)
 | |
| 					currentStatus := (*m.Streams)[idx].status
 | |
| 
 | |
| 					if !strings.Contains(currentStatus, "starting") &&
 | |
| 						strings.Contains(newStatus, "starting") {
 | |
| 						(*m.Streams)[idx].retries += 1
 | |
| 					}
 | |
| 
 | |
| 					if (*m.Streams)[idx].rollback {
 | |
| 						if msg.jsonMsg.ID == "rollback" {
 | |
| 							(*m.Streams)[idx].status = newStatus
 | |
| 						}
 | |
| 					} else {
 | |
| 						(*m.Streams)[idx].status = newStatus
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		cmds = append(
 | |
| 			cmds,
 | |
| 			func() tea.Msg { return msg.stream.process() },
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	return m, tea.Batch(cmds...)
 | |
| }
 | |
| 
 | |
| func (m Model) View() string {
 | |
| 	body := strings.Builder{}
 | |
| 
 | |
| 	for _, stream := range *m.Streams {
 | |
| 		split := strings.Split(stream.Name, "_")
 | |
| 		short := split[len(split)-1]
 | |
| 
 | |
| 		status := stream.status
 | |
| 		if strings.Contains(stream.status, "converged") && !stream.rollback {
 | |
| 			status = "succeeded"
 | |
| 		}
 | |
| 		if strings.Contains(stream.status, "rolled back") {
 | |
| 			status = "rolled back"
 | |
| 		}
 | |
| 
 | |
| 		retries := 0
 | |
| 		if stream.retries > 0 {
 | |
| 			retries = stream.retries
 | |
| 		}
 | |
| 
 | |
| 		output := fmt.Sprintf("%s: %s (retries: %v, healthcheck: %s)",
 | |
| 			formatter.BoldStyle.Render(short),
 | |
| 			status,
 | |
| 			retries,
 | |
| 			stream.health,
 | |
| 		)
 | |
| 
 | |
| 		body.WriteString(output)
 | |
| 		body.WriteString("\n")
 | |
| 	}
 | |
| 
 | |
| 	return body.String()
 | |
| }
 |