pkg/command: wrap jsonmessage.DisplayJSONMessagesStream with go context

Allows for the `jsonmessage.DisplayJSONMessagesStream` function
to correctly return when the context is cancelled with the appropriate
reason (`ctx.Error()`) instead of just a nil error.

Follow-up to 30a73ff19c

Signed-off-by: Alano Terblanche <18033717+Benehiko@users.noreply.github.com>
Co-authored-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
Alano Terblanche 2024-12-02 15:35:39 +01:00
parent b462778491
commit 91adb70d6b
No known key found for this signature in database
GPG Key ID: 0E8FACD1BA98DE27
14 changed files with 184 additions and 53 deletions

View File

@ -13,13 +13,13 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/command/image"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/cli/cli/streams"
"github.com/docker/cli/opts"
"github.com/docker/docker/api/types/container"
imagetypes "github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/jsonmessage"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@ -148,7 +148,7 @@ func pullImage(ctx context.Context, dockerCli command.Cli, img string, options *
if options.quiet {
out = streams.NewOut(io.Discard)
}
return jsonmessage.DisplayJSONMessagesToStream(responseBody, out, nil)
return jsonstream.Display(ctx, responseBody, out)
}
type cidFile struct {

View File

@ -230,12 +230,7 @@ func TestRunPullTermination(t *testing.T) {
createContainerFunc: func(config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig,
platform *specs.Platform, containerName string,
) (container.CreateResponse, error) {
select {
case <-ctx.Done():
return container.CreateResponse{}, ctx.Err()
default:
}
return container.CreateResponse{}, fakeNotFound{}
return container.CreateResponse{}, errors.New("shouldn't try to create a container")
},
containerAttachFunc: func(ctx context.Context, containerID string, options container.AttachOptions) (types.HijackedResponse, error) {
return types.HijackedResponse{}, errors.New("shouldn't try to attach to a container")
@ -253,7 +248,6 @@ func TestRunPullTermination(t *testing.T) {
assert.NilError(t, server.Close(), "failed to close imageCreateFunc server")
return
default:
}
assert.NilError(t, enc.Encode(jsonmessage.JSONMessage{
Status: "Downloading",
ID: fmt.Sprintf("id-%d", i),
@ -267,6 +261,7 @@ func TestRunPullTermination(t *testing.T) {
}))
time.Sleep(100 * time.Millisecond)
}
}
}()
attachCh <- struct{}{}
return client, nil
@ -277,7 +272,7 @@ func TestRunPullTermination(t *testing.T) {
cmd := NewRunCommand(fakeCLI)
cmd.SetOut(io.Discard)
cmd.SetErr(io.Discard)
cmd.SetArgs([]string{"foobar:latest"})
cmd.SetArgs([]string{"--pull", "always", "foobar:latest"})
cmdErrC := make(chan error, 1)
go func() {

View File

@ -20,6 +20,8 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/command/image/build"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/cli/cli/streams"
"github.com/docker/cli/opts"
"github.com/docker/docker/api"
"github.com/docker/docker/api/types"
@ -28,7 +30,6 @@ import (
"github.com/docker/docker/builder/remotecontext/urlutil"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter"
"github.com/pkg/errors"
@ -352,7 +353,7 @@ func runBuild(ctx context.Context, dockerCli command.Cli, options buildOptions)
defer response.Body.Close()
imageID := ""
aux := func(msg jsonmessage.JSONMessage) {
aux := func(msg jsonstream.JSONMessage) {
var result types.BuildResult
if err := json.Unmarshal(*msg.Aux, &result); err != nil {
fmt.Fprintf(dockerCli.Err(), "Failed to parse aux message: %s", err)
@ -361,9 +362,9 @@ func runBuild(ctx context.Context, dockerCli command.Cli, options buildOptions)
}
}
err = jsonmessage.DisplayJSONMessagesStream(response.Body, buildBuff, dockerCli.Out().FD(), dockerCli.Out().IsTerminal(), aux)
err = jsonstream.Display(ctx, response.Body, streams.NewOut(buildBuff), jsonstream.WithAuxCallback(aux))
if err != nil {
if jerr, ok := err.(*jsonmessage.JSONError); ok {
if jerr, ok := err.(*jsonstream.JSONError); ok {
// If no error code is set, default to 1
if jerr.Code == 0 {
jerr.Code = 1

View File

@ -8,9 +8,9 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/internal/jsonstream"
dockeropts "github.com/docker/cli/opts"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/spf13/cobra"
)
@ -90,5 +90,5 @@ func runImport(ctx context.Context, dockerCli command.Cli, options importOptions
}
defer responseBody.Close()
return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil)
return jsonstream.Display(ctx, responseBody, dockerCli.Out())
}

View File

@ -8,8 +8,8 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/moby/sys/sequential"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@ -89,7 +89,7 @@ func runLoad(ctx context.Context, dockerCli command.Cli, opts loadOptions) error
defer response.Body.Close()
if response.Body != nil && response.JSON {
return jsonmessage.DisplayJSONMessagesToStream(response.Body, dockerCli.Out(), nil)
return jsonstream.Display(ctx, response.Body, dockerCli.Out())
}
_, err = io.Copy(dockerCli.Out(), response.Body)

View File

@ -15,11 +15,11 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/cli/cli/streams"
"github.com/docker/docker/api/types/auxprogress"
"github.com/docker/docker/api/types/image"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/registry"
"github.com/morikuni/aec"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -140,23 +140,23 @@ To push the complete multi-platform image, remove the --platform flag.
defer responseBody.Close()
if !opts.untrusted {
// TODO PushTrustedReference currently doesn't respect `--quiet`
return PushTrustedReference(dockerCli, repoInfo, ref, authConfig, responseBody)
return PushTrustedReference(ctx, dockerCli, repoInfo, ref, authConfig, responseBody)
}
if opts.quiet {
err = jsonmessage.DisplayJSONMessagesToStream(responseBody, streams.NewOut(io.Discard), handleAux(dockerCli))
err = jsonstream.Display(ctx, responseBody, streams.NewOut(io.Discard), jsonstream.WithAuxCallback(handleAux()))
if err == nil {
fmt.Fprintln(dockerCli.Out(), ref.String())
}
return err
}
return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), handleAux(dockerCli))
return jsonstream.Display(ctx, responseBody, dockerCli.Out(), jsonstream.WithAuxCallback(handleAux()))
}
var notes []string
func handleAux(dockerCli command.Cli) func(jm jsonmessage.JSONMessage) {
return func(jm jsonmessage.JSONMessage) {
func handleAux() func(jm jsonstream.JSONMessage) {
return func(jm jsonstream.JSONMessage) {
b := []byte(*jm.Aux)
var stripped auxprogress.ManifestPushedInsteadOfIndex

View File

@ -10,12 +10,12 @@ import (
"github.com/distribution/reference"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/cli/cli/streams"
"github.com/docker/cli/cli/trust"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/image"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/registry"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
@ -39,20 +39,20 @@ func TrustedPush(ctx context.Context, cli command.Cli, repoInfo *registry.Reposi
defer responseBody.Close()
return PushTrustedReference(cli, repoInfo, ref, authConfig, responseBody)
return PushTrustedReference(ctx, cli, repoInfo, ref, authConfig, responseBody)
}
// PushTrustedReference pushes a canonical reference to the trust server.
//
//nolint:gocyclo
func PushTrustedReference(ioStreams command.Streams, repoInfo *registry.RepositoryInfo, ref reference.Named, authConfig registrytypes.AuthConfig, in io.Reader) error {
func PushTrustedReference(ctx context.Context, ioStreams command.Streams, repoInfo *registry.RepositoryInfo, ref reference.Named, authConfig registrytypes.AuthConfig, in io.Reader) error {
// If it is a trusted push we would like to find the target entry which match the
// tag provided in the function and then do an AddTarget later.
target := &client.Target{}
// Count the times of calling for handleTarget,
// if it is called more that once, that should be considered an error in a trusted push.
cnt := 0
handleTarget := func(msg jsonmessage.JSONMessage) {
handleTarget := func(msg jsonstream.JSONMessage) {
cnt++
if cnt > 1 {
// handleTarget should only be called once. This will be treated as an error.
@ -84,14 +84,14 @@ func PushTrustedReference(ioStreams command.Streams, repoInfo *registry.Reposito
default:
// We want trust signatures to always take an explicit tag,
// otherwise it will act as an untrusted push.
if err := jsonmessage.DisplayJSONMessagesToStream(in, ioStreams.Out(), nil); err != nil {
if err := jsonstream.Display(ctx, in, ioStreams.Out()); err != nil {
return err
}
fmt.Fprintln(ioStreams.Err(), "No tag specified, skipping trust metadata push")
return nil
}
if err := jsonmessage.DisplayJSONMessagesToStream(in, ioStreams.Out(), handleTarget); err != nil {
if err := jsonstream.Display(ctx, in, ioStreams.Out(), jsonstream.WithAuxCallback(handleTarget)); err != nil {
return err
}
@ -283,7 +283,7 @@ func imagePullPrivileged(ctx context.Context, cli command.Cli, imgRefAndAuth tru
if opts.quiet {
out = streams.NewOut(io.Discard)
}
return jsonmessage.DisplayJSONMessagesToStream(responseBody, out, nil)
return jsonstream.Display(ctx, responseBody, out)
}
// TrustedReference returns the canonical trusted reference for an image reference

View File

@ -9,9 +9,9 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/docker/api/types"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@ -129,7 +129,7 @@ func runInstall(ctx context.Context, dockerCli command.Cli, opts pluginOptions)
return err
}
defer responseBody.Close()
if err := jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil); err != nil {
if err := jsonstream.Display(ctx, responseBody, dockerCli.Out()); err != nil {
return err
}
fmt.Fprintf(dockerCli.Out(), "Installed plugin %s\n", opts.remote) // todo: return proper values from the API for this result

View File

@ -7,8 +7,8 @@ import (
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image"
"github.com/docker/cli/cli/internal/jsonstream"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@ -66,8 +66,8 @@ func runPush(ctx context.Context, dockerCli command.Cli, opts pushOptions) error
defer responseBody.Close()
if !opts.untrusted {
return image.PushTrustedReference(dockerCli, repoInfo, named, authConfig, responseBody)
return image.PushTrustedReference(ctx, dockerCli, repoInfo, named, authConfig, responseBody)
}
return jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil)
return jsonstream.Display(ctx, responseBody, dockerCli.Out())
}

View File

@ -8,8 +8,8 @@ import (
"github.com/distribution/reference"
"github.com/docker/cli/cli"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)
@ -86,7 +86,7 @@ func runUpgrade(ctx context.Context, dockerCli command.Cli, opts pluginOptions)
return err
}
defer responseBody.Close()
if err := jsonmessage.DisplayJSONMessagesToStream(responseBody, dockerCli.Out(), nil); err != nil {
if err := jsonstream.Display(ctx, responseBody, dockerCli.Out()); err != nil {
return err
}
fmt.Fprintf(dockerCli.Out(), "Upgraded plugin %s to %s\n", opts.localName, opts.remote) // todo: return proper values from the API for this result

View File

@ -6,7 +6,7 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/service/progress"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/cli/cli/internal/jsonstream"
)
// WaitOnService waits for the service to converge. It outputs a progress bar,
@ -24,7 +24,7 @@ func WaitOnService(ctx context.Context, dockerCli command.Cli, serviceID string,
return <-errChan
}
err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil)
err := jsonstream.Display(ctx, pipeReader, dockerCli.Out())
if err == nil {
err = <-errChan
}

View File

@ -10,8 +10,8 @@ import (
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/completion"
"github.com/docker/cli/cli/command/swarm/progress"
"github.com/docker/cli/cli/internal/jsonstream"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -120,7 +120,7 @@ func attach(ctx context.Context, dockerCli command.Cli, opts caOptions) error {
return <-errChan
}
err := jsonmessage.DisplayJSONMessagesToStream(pipeReader, dockerCli.Out(), nil)
err := jsonstream.Display(ctx, pipeReader, dockerCli.Out())
if err == nil {
err = <-errChan
}

View File

@ -0,0 +1,68 @@
package jsonstream
import (
"context"
"io"
"github.com/docker/docker/pkg/jsonmessage"
)
type (
Stream = jsonmessage.Stream
JSONMessage = jsonmessage.JSONMessage
JSONError = jsonmessage.JSONError
JSONProgress = jsonmessage.JSONProgress
)
type ctxReader struct {
err chan error
r io.Reader
}
func (r *ctxReader) Read(p []byte) (n int, err error) {
select {
case err = <-r.err:
return 0, err
default:
return r.r.Read(p)
}
}
type Options func(*options)
type options struct {
AuxCallback func(JSONMessage)
}
func WithAuxCallback(cb func(JSONMessage)) Options {
return func(o *options) {
o.AuxCallback = cb
}
}
// Display prints the JSON messages from the given reader to the given stream.
//
// It wraps the [jsonmessage.DisplayJSONMessagesStream] function to make it
// "context aware" and appropriately returns why the function was canceled.
//
// It returns an error if the context is canceled, but not if the input reader / stream is closed.
func Display(ctx context.Context, in io.Reader, stream Stream, opts ...Options) error {
if ctx.Err() != nil {
return ctx.Err()
}
ctxReader := &ctxReader{err: make(chan error, 1), r: in}
stopFunc := context.AfterFunc(ctx, func() { ctxReader.err <- ctx.Err() })
defer stopFunc()
o := options{}
for _, opt := range opts {
opt(&o)
}
if err := jsonmessage.DisplayJSONMessagesStream(ctxReader, stream, stream.FD(), stream.IsTerminal(), o.AuxCallback); err != nil {
return err
}
return ctx.Err()
}

View File

@ -0,0 +1,67 @@
package jsonstream
import (
"context"
"encoding/json"
"fmt"
"io"
"testing"
"time"
"github.com/docker/cli/cli/streams"
"gotest.tools/v3/assert"
)
func TestDisplay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
client, server := io.Pipe()
t.Cleanup(func() {
assert.NilError(t, server.Close())
})
go func() {
enc := json.NewEncoder(server)
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
assert.NilError(t, server.Close(), "failed to close jsonmessage server")
return
default:
err := enc.Encode(JSONMessage{
Status: "Downloading",
ID: fmt.Sprintf("id-%d", i),
TimeNano: time.Now().UnixNano(),
Time: time.Now().Unix(),
Progress: &JSONProgress{
Current: int64(i),
Total: 100,
Start: 0,
},
})
if err != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
}
}()
streamCtx, cancelStream := context.WithCancel(context.Background())
t.Cleanup(cancelStream)
done := make(chan error)
go func() {
out := streams.NewOut(io.Discard)
done <- Display(streamCtx, client, out)
}()
cancelStream()
select {
case <-time.After(time.Second * 3):
case err := <-done:
assert.ErrorIs(t, err, context.Canceled)
}
}