Files
docker-cli/cli/command/service/progress/progress.go
Aaron Lehmann d3d09f67b1 progress: Show progress of replicated tasks before they are assigned
This was only showing tasks that belong to nodes that are currently up,
so that tasks on down nodes don't appear to be stuck. But this
unintentionally excludes tasks that haven't been assigned yet, so if a
task is stuck before assignment, for example because no nodes meet its
constraints, a progress bar won't even be shown. The check should only
apply to tasks that have a node assignment.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2017-06-23 17:34:59 -07:00

416 lines
11 KiB
Go

package progress
import (
"errors"
"fmt"
"io"
"os"
"os/signal"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"golang.org/x/net/context"
)
var (
numberedStates = map[swarm.TaskState]int64{
swarm.TaskStateNew: 1,
swarm.TaskStateAllocated: 2,
swarm.TaskStatePending: 3,
swarm.TaskStateAssigned: 4,
swarm.TaskStateAccepted: 5,
swarm.TaskStatePreparing: 6,
swarm.TaskStateReady: 7,
swarm.TaskStateStarting: 8,
swarm.TaskStateRunning: 9,
}
longestState int
)
const (
maxProgress = 9
maxProgressBars = 20
)
type progressUpdater interface {
update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error)
}
func init() {
for state := range numberedStates {
if len(state) > longestState {
longestState = len(state)
}
}
}
func stateToProgress(state swarm.TaskState, rollback bool) int64 {
if !rollback {
return numberedStates[state]
}
return int64(len(numberedStates)) - numberedStates[state]
}
// ServiceProgress outputs progress information for convergence of a service.
// nolint: gocyclo
func ServiceProgress(ctx context.Context, client client.APIClient, serviceID string, progressWriter io.WriteCloser) error {
defer progressWriter.Close()
progressOut := streamformatter.NewJSONProgressOutput(progressWriter, false)
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)
defer signal.Stop(sigint)
taskFilter := filters.NewArgs()
taskFilter.Add("service", serviceID)
taskFilter.Add("_up-to-date", "true")
getUpToDateTasks := func() ([]swarm.Task, error) {
return client.TaskList(ctx, types.TaskListOptions{Filters: taskFilter})
}
var (
updater progressUpdater
converged bool
convergedAt time.Time
monitor = 5 * time.Second
rollback bool
)
for {
service, _, err := client.ServiceInspectWithRaw(ctx, serviceID, types.ServiceInspectOptions{})
if err != nil {
return err
}
if service.Spec.UpdateConfig != nil && service.Spec.UpdateConfig.Monitor != 0 {
monitor = service.Spec.UpdateConfig.Monitor
}
if updater == nil {
updater, err = initializeUpdater(service, progressOut)
if err != nil {
return err
}
}
if service.UpdateStatus != nil {
switch service.UpdateStatus.State {
case swarm.UpdateStateUpdating:
rollback = false
case swarm.UpdateStateCompleted:
if !converged {
return nil
}
case swarm.UpdateStatePaused:
return fmt.Errorf("service update paused: %s", service.UpdateStatus.Message)
case swarm.UpdateStateRollbackStarted:
if !rollback && service.UpdateStatus.Message != "" {
progressOut.WriteProgress(progress.Progress{
ID: "rollback",
Action: service.UpdateStatus.Message,
})
}
rollback = true
case swarm.UpdateStateRollbackPaused:
return fmt.Errorf("service rollback paused: %s", service.UpdateStatus.Message)
case swarm.UpdateStateRollbackCompleted:
if !converged {
return fmt.Errorf("service rolled back: %s", service.UpdateStatus.Message)
}
}
}
if converged && time.Since(convergedAt) >= monitor {
return nil
}
tasks, err := getUpToDateTasks()
if err != nil {
return err
}
activeNodes, err := getActiveNodes(ctx, client)
if err != nil {
return err
}
converged, err = updater.update(service, tasks, activeNodes, rollback)
if err != nil {
return err
}
if converged {
if convergedAt.IsZero() {
convergedAt = time.Now()
}
wait := monitor - time.Since(convergedAt)
if wait >= 0 {
progressOut.WriteProgress(progress.Progress{
// Ideally this would have no ID, but
// the progress rendering code behaves
// poorly on an "action" with no ID. It
// returns the cursor to the beginning
// of the line, so the first character
// may be difficult to read. Then the
// output is overwritten by the shell
// prompt when the command finishes.
ID: "verify",
Action: fmt.Sprintf("Waiting %d seconds to verify that tasks are stable...", wait/time.Second+1),
})
}
} else {
if !convergedAt.IsZero() {
progressOut.WriteProgress(progress.Progress{
ID: "verify",
Action: "Detected task failure",
})
}
convergedAt = time.Time{}
}
select {
case <-time.After(200 * time.Millisecond):
case <-sigint:
if !converged {
progress.Message(progressOut, "", "Operation continuing in background.")
progress.Messagef(progressOut, "", "Use `docker service ps %s` to check progress.", serviceID)
}
return nil
}
}
}
func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) {
nodes, err := client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return nil, err
}
activeNodes := make(map[string]swarm.Node)
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = n
}
}
return activeNodes, nil
}
func initializeUpdater(service swarm.Service, progressOut progress.Output) (progressUpdater, error) {
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
return &replicatedProgressUpdater{
progressOut: progressOut,
}, nil
}
if service.Spec.Mode.Global != nil {
return &globalProgressUpdater{
progressOut: progressOut,
}, nil
}
return nil, errors.New("unrecognized service mode")
}
func writeOverallProgress(progressOut progress.Output, numerator, denominator int, rollback bool) {
if rollback {
progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: fmt.Sprintf("rolling back update: %d out of %d tasks", numerator, denominator),
})
return
}
progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: fmt.Sprintf("%d out of %d tasks", numerator, denominator),
})
}
type replicatedProgressUpdater struct {
progressOut progress.Output
// used for mapping slots to a contiguous space
// this also causes progress bars to appear in order
slotMap map[int]int
initialized bool
done bool
}
// nolint: gocyclo
func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil {
return false, errors.New("no replica count")
}
replicas := *service.Spec.Mode.Replicated.Replicas
if !u.initialized {
u.slotMap = make(map[int]int)
// Draw progress bars in order
writeOverallProgress(u.progressOut, 0, int(replicas), rollback)
if replicas <= maxProgressBars {
for i := uint64(1); i <= replicas; i++ {
progress.Update(u.progressOut, fmt.Sprintf("%d/%d", i, replicas), " ")
}
}
u.initialized = true
}
// If there are multiple tasks with the same slot number, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksBySlot := make(map[int]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksBySlot[task.Slot]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
if task.NodeID != "" {
if _, nodeActive := activeNodes[task.NodeID]; nodeActive {
tasksBySlot[task.Slot] = task
}
} else {
tasksBySlot[task.Slot] = task
}
}
// If we had reached a converged state, check if we are still converged.
if u.done {
for _, task := range tasksBySlot {
if task.Status.State != swarm.TaskStateRunning {
u.done = false
break
}
}
}
running := uint64(0)
for _, task := range tasksBySlot {
mappedSlot := u.slotMap[task.Slot]
if mappedSlot == 0 {
mappedSlot = len(u.slotMap) + 1
u.slotMap[task.Slot] = mappedSlot
}
if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas {
u.progressOut.WriteProgress(progress.Progress{
ID: fmt.Sprintf("%d/%d", mappedSlot, replicas),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
running++
}
}
if !u.done {
writeOverallProgress(u.progressOut, int(running), int(replicas), rollback)
if running == replicas {
u.done = true
}
}
return running == replicas, nil
}
type globalProgressUpdater struct {
progressOut progress.Output
initialized bool
done bool
}
func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) {
// If there are multiple tasks with the same node ID, favor the one
// with the *lowest* desired state. This can happen in restart
// scenarios.
tasksByNode := make(map[string]swarm.Task)
for _, task := range tasks {
if numberedStates[task.DesiredState] == 0 {
continue
}
if existingTask, ok := tasksByNode[task.NodeID]; ok {
if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] {
continue
}
}
tasksByNode[task.NodeID] = task
}
// We don't have perfect knowledge of how many nodes meet the
// constraints for this service. But the orchestrator creates tasks
// for all eligible nodes at the same time, so we should see all those
// nodes represented among the up-to-date tasks.
nodeCount := len(tasksByNode)
if !u.initialized {
if nodeCount == 0 {
// Two possibilities: either the orchestrator hasn't created
// the tasks yet, or the service doesn't meet constraints for
// any node. Either way, we wait.
u.progressOut.WriteProgress(progress.Progress{
ID: "overall progress",
Action: "waiting for new tasks",
})
return false, nil
}
writeOverallProgress(u.progressOut, 0, nodeCount, rollback)
u.initialized = true
}
// If we had reached a converged state, check if we are still converged.
if u.done {
for _, task := range tasksByNode {
if task.Status.State != swarm.TaskStateRunning {
u.done = false
break
}
}
}
running := 0
for _, task := range tasksByNode {
if node, nodeActive := activeNodes[task.NodeID]; nodeActive {
if !u.done && nodeCount <= maxProgressBars {
u.progressOut.WriteProgress(progress.Progress{
ID: stringid.TruncateID(node.ID),
Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State),
Current: stateToProgress(task.Status.State, rollback),
Total: maxProgress,
HideCounts: true,
})
}
if task.Status.State == swarm.TaskStateRunning {
running++
}
}
}
if !u.done {
writeOverallProgress(u.progressOut, running, nodeCount, rollback)
if running == nodeCount {
u.done = true
}
}
return running == nodeCount, nil
}