forked from toolshed/coop-cloud-backend
305 lines
6.7 KiB
Go
305 lines
6.7 KiB
Go
package status
|
|
import (
|
|
"context"
|
|
"os"
|
|
"io"
|
|
"log"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
"encoding/json"
|
|
|
|
dockerClient "github.com/docker/docker/client"
|
|
containerTypes "github.com/docker/docker/api/types/container"
|
|
"github.com/docker/cli/cli/command/service/progress"
|
|
"github.com/docker/docker/pkg/jsonmessage"
|
|
"github.com/docker/docker/api/types/filters"
|
|
|
|
"coopcloud.tech/abra/pkg/ui"
|
|
)
|
|
type Event interface {
|
|
ServiceID() string
|
|
}
|
|
|
|
type ProgressEvent struct {
|
|
Service ServiceState
|
|
Err error
|
|
Failed bool
|
|
}
|
|
func (e ProgressEvent) ServiceID() string { return e.Service.Id }
|
|
|
|
type HealthEvent struct {
|
|
Service ServiceState
|
|
Err error
|
|
Health string
|
|
}
|
|
func (e HealthEvent) ServiceID() string { return e.Service.Id }
|
|
|
|
type StatusEvent struct {
|
|
Service ServiceState
|
|
Err error
|
|
JsonMsg jsonmessage.JSONMessage
|
|
}
|
|
func (e StatusEvent) ServiceID() string { return e.Service.Id }
|
|
|
|
type StreamEvent struct {
|
|
Type string `json:"type"` // "service" | "done"
|
|
Data interface{} `json:"data"`
|
|
}
|
|
|
|
type ServiceState struct {
|
|
Name string `json:"name"`
|
|
Err error `json:"err"`
|
|
Id string `json:"id"`
|
|
Status string `json:"status"`
|
|
Retries int `json:"retries"`
|
|
Health string `json:"health"`
|
|
Rollback bool `json:"rollback"`
|
|
Failed bool `json:"failed"`
|
|
}
|
|
|
|
type DeployState struct {
|
|
Count int `json:"count"`
|
|
Total int `json:"total"`
|
|
Failed bool `json:"failed"`
|
|
Quit bool `json:"quit"`
|
|
}
|
|
|
|
type DeployMsg int
|
|
const (
|
|
FailMsg DeployMsg = iota
|
|
CompleteMsg
|
|
QuitMsg
|
|
)
|
|
|
|
func (ds DeployState) complete() bool {
|
|
return ds.Count == ds.Total
|
|
}
|
|
|
|
func progressProducer(ctx context.Context, cl *dockerClient.Client, s ServiceState, w *io.PipeWriter, ch chan<- Event) {
|
|
go func() {
|
|
log.Printf("producing progress...")
|
|
err := progress.ServiceProgress(ctx, cl, s.Id, w)
|
|
log.Printf("got some service progress here")
|
|
if err != nil {
|
|
log.Printf("Error in service progress")
|
|
ch <- ProgressEvent{
|
|
Service: s,
|
|
Failed: true,
|
|
Err: err,
|
|
}
|
|
} else {
|
|
ch <- ProgressEvent{
|
|
Service: s,
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func statusProducer(ctx context.Context, decoder *json.Decoder, s ServiceState, ch chan<- Event) {
|
|
go func() {
|
|
log.Printf("producing status...")
|
|
for {
|
|
var msg jsonmessage.JSONMessage
|
|
|
|
if err := decoder.Decode(&msg); err != nil {
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
|
|
ch <- StatusEvent{
|
|
Service: s,
|
|
Err: err,
|
|
}
|
|
return
|
|
}
|
|
log.Printf("Maybe writing JSON message")
|
|
msg.Display(os.Stdout, false);
|
|
ch <- StatusEvent{
|
|
Service: s,
|
|
JsonMsg: msg,
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func healthProducer(ctx context.Context, cl *dockerClient.Client, s ServiceState, ch chan<- Event) {
|
|
go func() {
|
|
log.Printf("producing health...")
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case <-ticker.C:
|
|
health := ""
|
|
|
|
filters := filters.NewArgs()
|
|
filters.Add("name", fmt.Sprintf("^%s", s.Name))
|
|
|
|
containers, err := cl.ContainerList(ctx, containerTypes.ListOptions{Filters: filters})
|
|
if err != nil {
|
|
ch <- HealthEvent{Service: s, Err: err}
|
|
continue
|
|
}
|
|
if len(containers) == 0 {
|
|
ch <- HealthEvent{Service: s}
|
|
continue
|
|
}
|
|
|
|
containerState, err := cl.ContainerInspect(ctx, containers[0].ID)
|
|
if err != nil {
|
|
ch <- HealthEvent{Service: s, Err: err}
|
|
continue
|
|
}
|
|
|
|
if containerState.State.Health != nil {
|
|
health = containerState.State.Health.Status
|
|
}
|
|
ch <- HealthEvent{
|
|
Service: s,
|
|
Health: health,
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func processEvent(ctx context.Context, events <- chan Event, info chan <- DeployMsg, s ServiceState, stream chan <- StreamEvent) error {
|
|
for {
|
|
select {
|
|
case event := <- events:
|
|
if event.ServiceID() != s.Id {
|
|
continue
|
|
}
|
|
switch event := event.(type) {
|
|
case ProgressEvent:
|
|
if event.Err != nil {
|
|
s.Err = event.Err
|
|
log.Printf("Error in progress event: %s", s.Err)
|
|
}
|
|
if event.Failed {
|
|
info <- FailMsg
|
|
}
|
|
// print for debugging purposes
|
|
log.Printf("Service: %s is complete", event.Service.Id)
|
|
// end print
|
|
info <- CompleteMsg
|
|
|
|
case StatusEvent:
|
|
if event.Err != nil {
|
|
s.Err = event.Err
|
|
log.Printf("Error in status event: %s", s.Err)
|
|
}
|
|
|
|
// print for debugging purposes
|
|
b, err := json.MarshalIndent(event.JsonMsg, "", " ")
|
|
if err != nil {
|
|
log.Printf("Problem with json message: %s", err)
|
|
}
|
|
log.Printf(string(b))
|
|
// end print
|
|
|
|
if event.JsonMsg.ID == "rollback" {
|
|
log.Printf("Failed in rollback")
|
|
s.Failed = true
|
|
s.Rollback = true
|
|
}
|
|
|
|
if event.JsonMsg.ID != "overall progress" {
|
|
newStatus := strings.ToLower(event.JsonMsg.Status)
|
|
currentStatus := s.Status
|
|
|
|
if !strings.Contains(currentStatus, "starting") &&
|
|
strings.Contains(newStatus, "starting") {
|
|
s.Retries += 1
|
|
}
|
|
|
|
if s.Rollback {
|
|
if event.JsonMsg.ID == "rollback" {
|
|
s.Status = newStatus
|
|
}
|
|
} else {
|
|
s.Status = newStatus
|
|
}
|
|
}
|
|
|
|
case HealthEvent:
|
|
if event.Err != nil {
|
|
s.Err = event.Err
|
|
log.Printf("Error in health event: %s", s.Err)
|
|
}
|
|
h := "?"
|
|
if s.Health != "" {
|
|
h = s.Health
|
|
}
|
|
if event.Health != "" {
|
|
h = event.Health
|
|
}
|
|
s.Health = h
|
|
}
|
|
stream <- StreamEvent{Type: "service", Data: s}
|
|
case <- ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
func processService(ctx context.Context, info chan <- DeployMsg, cl *dockerClient.Client, s ServiceState, decoder *json.Decoder, writer *io.PipeWriter, stream chan <- StreamEvent) {
|
|
events := make (chan Event, 50)
|
|
progressProducer(ctx, cl, s, writer, events)
|
|
statusProducer(ctx, decoder, s, events)
|
|
healthProducer(ctx, cl, s, events)
|
|
|
|
go processEvent(ctx, events, info, s, stream)
|
|
}
|
|
|
|
func WaitOnServices(pctx context.Context, cl *dockerClient.Client, services []ui.ServiceMeta, filters filters.Args, stream chan <- StreamEvent) {
|
|
ctx, cancel := context.WithCancel(pctx)
|
|
defer cancel()
|
|
log.Printf("What???")
|
|
ds := DeployState{
|
|
Count: 0,
|
|
Total: len(services),
|
|
Failed: false,
|
|
Quit: false,
|
|
}
|
|
info := make (chan DeployMsg, 50)
|
|
for _, service := range services {
|
|
r, w := io.Pipe()
|
|
d := json.NewDecoder(r)
|
|
s := ServiceState{
|
|
Name: service.Name,
|
|
Id: service.ID,
|
|
Retries: -1,
|
|
Health: "?",
|
|
}
|
|
log.Printf("Processing Service: %s", s.Id)
|
|
processService(ctx, info, cl, s, d, w, stream)
|
|
}
|
|
for {
|
|
select {
|
|
case msg := <-info:
|
|
switch msg {
|
|
case FailMsg:
|
|
ds.Failed = true
|
|
case CompleteMsg:
|
|
ds.Count += 1
|
|
if ds.complete() {
|
|
log.Printf("deploy completed")
|
|
cancel()
|
|
}
|
|
case QuitMsg:
|
|
ds.Quit = true
|
|
cancel()
|
|
}
|
|
case <-ctx.Done():
|
|
log.Printf("%v", ds)
|
|
stream <- StreamEvent{Type: "done", Data: ds}
|
|
log.Printf("Context finished because: %s", ctx.Err())
|
|
return
|
|
}
|
|
}
|
|
} |