diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index d522fc08b4..adff486848 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/signal" + "strings" "time" "github.com/docker/docker/api/types" @@ -29,6 +30,13 @@ var ( swarm.TaskStateReady: 7, swarm.TaskStateStarting: 8, swarm.TaskStateRunning: 9, + + // The following states are not actually shown in progress + // output, but are used internally for ordering. + swarm.TaskStateComplete: 10, + swarm.TaskStateShutdown: 11, + swarm.TaskStateFailed: 12, + swarm.TaskStateRejected: 13, } longestState int @@ -40,22 +48,26 @@ const ( ) type progressUpdater interface { - update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) + update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) } func init() { for state := range numberedStates { - if len(state) > longestState { + if !terminalState(state) && len(state) > longestState { longestState = len(state) } } } +func terminalState(state swarm.TaskState) bool { + return numberedStates[state] > numberedStates[swarm.TaskStateRunning] +} + func stateToProgress(state swarm.TaskState, rollback bool) int64 { if !rollback { return numberedStates[state] } - return int64(len(numberedStates)) - numberedStates[state] + return numberedStates[swarm.TaskStateRunning] - numberedStates[state] } // ServiceProgress outputs progress information for convergence of a service. @@ -192,16 +204,16 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str } } -func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]swarm.Node, error) { +func getActiveNodes(ctx context.Context, client client.APIClient) (map[string]struct{}, error) { nodes, err := client.NodeList(ctx, types.NodeListOptions{}) if err != nil { return nil, err } - activeNodes := make(map[string]swarm.Node) + activeNodes := make(map[string]struct{}) for _, n := range nodes { if n.Status.State != swarm.NodeStateDown { - activeNodes[n.ID] = n + activeNodes[n.ID] = struct{}{} } } return activeNodes, nil @@ -235,6 +247,18 @@ func writeOverallProgress(progressOut progress.Output, numerator, denominator in }) } +func truncError(errMsg string) string { + // Remove newlines from the error, which corrupt the output. + errMsg = strings.Replace(errMsg, "\n", " ", -1) + + // Limit the length to 75 characters, so that even on narrow terminals + // this will not overflow to the next line. + if len(errMsg) > 75 { + errMsg = errMsg[:74] + "…" + } + return errMsg +} + type replicatedProgressUpdater struct { progressOut progress.Output @@ -246,8 +270,7 @@ type replicatedProgressUpdater struct { done bool } -// nolint: gocyclo -func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { +func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { if service.Spec.Mode.Replicated == nil || service.Spec.Mode.Replicated.Replicas == nil { return false, errors.New("no replica count") } @@ -267,27 +290,7 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. u.initialized = true } - // If there are multiple tasks with the same slot number, favor the one - // with the *lowest* desired state. This can happen in restart - // scenarios. - tasksBySlot := make(map[int]swarm.Task) - for _, task := range tasks { - if numberedStates[task.DesiredState] == 0 { - continue - } - if existingTask, ok := tasksBySlot[task.Slot]; ok { - if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { - continue - } - } - if task.NodeID != "" { - if _, nodeActive := activeNodes[task.NodeID]; nodeActive { - tasksBySlot[task.Slot] = task - } - } else { - tasksBySlot[task.Slot] = task - } - } + tasksBySlot := u.tasksBySlot(tasks, activeNodes) // If we had reached a converged state, check if we are still converged. if u.done { @@ -308,18 +311,11 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. u.slotMap[task.Slot] = mappedSlot } - if !u.done && replicas <= maxProgressBars && uint64(mappedSlot) <= replicas { - u.progressOut.WriteProgress(progress.Progress{ - ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), - Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), - Current: stateToProgress(task.Status.State, rollback), - Total: maxProgress, - HideCounts: true, - }) - } - if task.Status.State == swarm.TaskStateRunning { + if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning { running++ } + + u.writeTaskProgress(task, mappedSlot, replicas, rollback) } if !u.done { @@ -333,6 +329,62 @@ func (u *replicatedProgressUpdater) update(service swarm.Service, tasks []swarm. return running == replicas, nil } +func (u *replicatedProgressUpdater) tasksBySlot(tasks []swarm.Task, activeNodes map[string]struct{}) map[int]swarm.Task { + // If there are multiple tasks with the same slot number, favor the one + // with the *lowest* desired state. This can happen in restart + // scenarios. + tasksBySlot := make(map[int]swarm.Task) + for _, task := range tasks { + if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { + continue + } + if existingTask, ok := tasksBySlot[task.Slot]; ok { + if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { + continue + } + // If the desired states match, observed state breaks + // ties. This can happen with the "start first" service + // update mode. + if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && + numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { + continue + } + } + if task.NodeID != "" { + if _, nodeActive := activeNodes[task.NodeID]; !nodeActive { + continue + } + } + tasksBySlot[task.Slot] = task + } + + return tasksBySlot +} + +func (u *replicatedProgressUpdater) writeTaskProgress(task swarm.Task, mappedSlot int, replicas uint64, rollback bool) { + if u.done || replicas > maxProgressBars || uint64(mappedSlot) > replicas { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: truncError(task.Status.Err), + }) + return + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", mappedSlot, replicas), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } +} + type globalProgressUpdater struct { progressOut progress.Output @@ -340,22 +392,8 @@ type globalProgressUpdater struct { done bool } -func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]swarm.Node, rollback bool) (bool, error) { - // If there are multiple tasks with the same node ID, favor the one - // with the *lowest* desired state. This can happen in restart - // scenarios. - tasksByNode := make(map[string]swarm.Task) - for _, task := range tasks { - if numberedStates[task.DesiredState] == 0 { - continue - } - if existingTask, ok := tasksByNode[task.NodeID]; ok { - if numberedStates[existingTask.DesiredState] <= numberedStates[task.DesiredState] { - continue - } - } - tasksByNode[task.NodeID] = task - } +func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, rollback bool) (bool, error) { + tasksByNode := u.tasksByNode(tasks) // We don't have perfect knowledge of how many nodes meet the // constraints for this service. But the orchestrator creates tasks @@ -392,19 +430,12 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task running := 0 for _, task := range tasksByNode { - if node, nodeActive := activeNodes[task.NodeID]; nodeActive { - if !u.done && nodeCount <= maxProgressBars { - u.progressOut.WriteProgress(progress.Progress{ - ID: stringid.TruncateID(node.ID), - Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), - Current: stateToProgress(task.Status.State, rollback), - Total: maxProgress, - HideCounts: true, - }) - } - if task.Status.State == swarm.TaskStateRunning { + if _, nodeActive := activeNodes[task.NodeID]; nodeActive { + if !terminalState(task.DesiredState) && task.Status.State == swarm.TaskStateRunning { running++ } + + u.writeTaskProgress(task, nodeCount, rollback) } } @@ -418,3 +449,56 @@ func (u *globalProgressUpdater) update(service swarm.Service, tasks []swarm.Task return running == nodeCount, nil } + +func (u *globalProgressUpdater) tasksByNode(tasks []swarm.Task) map[string]swarm.Task { + // If there are multiple tasks with the same node ID, favor the one + // with the *lowest* desired state. This can happen in restart + // scenarios. + tasksByNode := make(map[string]swarm.Task) + for _, task := range tasks { + if numberedStates[task.DesiredState] == 0 || numberedStates[task.Status.State] == 0 { + continue + } + if existingTask, ok := tasksByNode[task.NodeID]; ok { + if numberedStates[existingTask.DesiredState] < numberedStates[task.DesiredState] { + continue + } + + // If the desired states match, observed state breaks + // ties. This can happen with the "start first" service + // update mode. + if numberedStates[existingTask.DesiredState] == numberedStates[task.DesiredState] && + numberedStates[existingTask.Status.State] <= numberedStates[task.Status.State] { + continue + } + + } + tasksByNode[task.NodeID] = task + } + + return tasksByNode +} + +func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int, rollback bool) { + if u.done || nodeCount > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(task.NodeID), + Action: truncError(task.Status.Err), + }) + return + } + + if !terminalState(task.DesiredState) && !terminalState(task.Status.State) { + u.progressOut.WriteProgress(progress.Progress{ + ID: stringid.TruncateID(task.NodeID), + Action: fmt.Sprintf("%-[1]*s", longestState, task.Status.State), + Current: stateToProgress(task.Status.State, rollback), + Total: maxProgress, + HideCounts: true, + }) + } +} diff --git a/cli/command/service/progress/progress_test.go b/cli/command/service/progress/progress_test.go new file mode 100644 index 0000000000..d1c118f7d7 --- /dev/null +++ b/cli/command/service/progress/progress_test.go @@ -0,0 +1,374 @@ +package progress + +import ( + "fmt" + "strconv" + "testing" + + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/pkg/progress" + "github.com/stretchr/testify/assert" +) + +type mockProgress struct { + p []progress.Progress +} + +func (mp *mockProgress) WriteProgress(p progress.Progress) error { + mp.p = append(mp.p, p) + return nil +} + +func (mp *mockProgress) clear() { + mp.p = nil +} + +type updaterTester struct { + t *testing.T + updater progressUpdater + p *mockProgress + service swarm.Service + activeNodes map[string]struct{} + rollback bool +} + +func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) { + u.p.clear() + + converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback) + assert.NoError(u.t, err) + assert.Equal(u.t, expectedConvergence, converged) + assert.Equal(u.t, expectedProgress, u.p.p) +} + +func TestReplicatedProgressUpdaterOneReplica(t *testing.T) { + replicas := uint64(1) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &replicas, + }, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &replicatedProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + {ID: "1/1", Action: " "}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with DesiredState beyond Running is ignored + tasks = append(tasks, + swarm.Task{ID: "1", + NodeID: "a", + DesiredState: swarm.TaskStateShutdown, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with valid DesiredState and State updates progress bar + tasks[0].DesiredState = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "new ", Current: 1, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task exposes an error, we should show that instead of the + // progress bar. + tasks[0].Status.Err = "something is wrong" + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "something is wrong"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // When the task reaches running, update should return true + tasks[0].Status.Err = "" + tasks[0].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // If the task fails, update should return false again + tasks[0].Status.Err = "task failed" + tasks[0].Status.State = swarm.TaskStateFailed + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "task failed"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task is restarted, progress output should be shown for the + // replacement task, not the old task. + tasks[0].DesiredState = swarm.TaskStateShutdown + tasks = append(tasks, + swarm.Task{ID: "2", + NodeID: "b", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + }) + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "1/1", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // Add a new task while the current one is still running, to simulate + // "start-then-stop" updates. + tasks = append(tasks, + swarm.Task{ID: "3", + NodeID: "b", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "1/1", Action: "preparing", Current: 6, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) +} + +func TestReplicatedProgressUpdaterManyReplicas(t *testing.T) { + replicas := uint64(50) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &replicas, + }, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &replicatedProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + // No per-task progress bars because there are too many replicas + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)}, + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", replicas)}, + }) + + for i := 0; i != int(replicas); i++ { + tasks = append(tasks, + swarm.Task{ + ID: strconv.Itoa(i), + Slot: i + 1, + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + + if i%2 == 1 { + tasks[i].NodeID = "b" + } + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i, replicas)}, + }) + + tasks[i].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, uint64(i) == replicas-1, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, replicas)}, + }) + } +} + +func TestGlobalProgressUpdaterOneNode(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Global: &swarm.GlobalService{}, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &globalProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "waiting for new tasks"}, + }) + + // Task with DesiredState beyond Running is ignored + tasks = append(tasks, + swarm.Task{ID: "1", + NodeID: "a", + DesiredState: swarm.TaskStateShutdown, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "0 out of 1 tasks"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // Task with valid DesiredState and State updates progress bar + tasks[0].DesiredState = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "new ", Current: 1, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task exposes an error, we should show that instead of the + // progress bar. + tasks[0].Status.Err = "something is wrong" + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "something is wrong"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // When the task reaches running, update should return true + tasks[0].Status.Err = "" + tasks[0].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // If the task fails, update should return false again + tasks[0].Status.Err = "task failed" + tasks[0].Status.State = swarm.TaskStateFailed + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "task failed"}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) + + // If the task is restarted, progress output should be shown for the + // replacement task, not the old task. + tasks[0].DesiredState = swarm.TaskStateShutdown + tasks = append(tasks, + swarm.Task{ID: "2", + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + }) + updaterTester.testUpdater(tasks, true, + []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "1 out of 1 tasks"}, + }) + + // Add a new task while the current one is still running, to simulate + // "start-then-stop" updates. + tasks = append(tasks, + swarm.Task{ID: "3", + NodeID: "a", + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + }) + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 9, HideCounts: true}, + {ID: "overall progress", Action: "0 out of 1 tasks"}, + }) +} + +func TestGlobalProgressUpdaterManyNodes(t *testing.T) { + nodes := 50 + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + Global: &swarm.GlobalService{}, + }, + }, + } + + p := &mockProgress{} + updaterTester := updaterTester{ + t: t, + updater: &globalProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{}, + service: service, + } + + for i := 0; i != nodes; i++ { + updaterTester.activeNodes[strconv.Itoa(i)] = struct{}{} + } + + tasks := []swarm.Task{} + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: "waiting for new tasks"}, + }) + + for i := 0; i != nodes; i++ { + tasks = append(tasks, + swarm.Task{ + ID: "task" + strconv.Itoa(i), + NodeID: strconv.Itoa(i), + DesiredState: swarm.TaskStateRunning, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + }) + } + + updaterTester.testUpdater(tasks, false, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)}, + {ID: "overall progress", Action: fmt.Sprintf("0 out of %d tasks", nodes)}, + }) + + for i := 0; i != nodes; i++ { + tasks[i].Status.State = swarm.TaskStateRunning + updaterTester.testUpdater(tasks, i == nodes-1, + []progress.Progress{ + {ID: "overall progress", Action: fmt.Sprintf("%d out of %d tasks", i+1, nodes)}, + }) + } +}