From 6eca53c7aec0ff1a2dccd3d6ad065435d36460d8 Mon Sep 17 00:00:00 2001 From: Josh Hawn Date: Wed, 17 May 2017 11:40:56 -0700 Subject: [PATCH] Refactor holdHijackedConnection It has been refactored to a hijackedIOStreamer type which has several methods which are used to prepare input and handle streaming the input and output separately. Docker-DCO-1.1-Signed-off-by: Josh Hawn (github: jlhawn) --- cli/command/container/attach.go | 13 +- cli/command/container/exec.go | 12 +- cli/command/container/hijack.go | 227 ++++++++++++++++++++------------ cli/command/container/run.go | 16 ++- cli/command/container/start.go | 12 +- 5 files changed, 190 insertions(+), 90 deletions(-) diff --git a/cli/command/container/attach.go b/cli/command/container/attach.go index 16c8897b2a..07df1dea55 100644 --- a/cli/command/container/attach.go +++ b/cli/command/container/attach.go @@ -109,7 +109,18 @@ func runAttach(dockerCli *command.DockerCli, opts *attachOptions) error { logrus.Debugf("Error monitoring TTY size: %s", err) } } - if err := holdHijackedConnection(ctx, dockerCli, c.Config.Tty, options.DetachKeys, in, dockerCli.Out(), dockerCli.Err(), resp); err != nil { + + streamer := hijackedIOStreamer{ + streams: dockerCli, + inputStream: in, + outputStream: dockerCli.Out(), + errorStream: dockerCli.Err(), + resp: resp, + tty: c.Config.Tty, + detachKeys: options.DetachKeys, + } + + if err := streamer.stream(ctx); err != nil { return err } diff --git a/cli/command/container/exec.go b/cli/command/container/exec.go index f3929a1b0a..587cc00f2f 100644 --- a/cli/command/container/exec.go +++ b/cli/command/container/exec.go @@ -146,7 +146,17 @@ func runExec(dockerCli *command.DockerCli, options *execOptions, container strin } defer resp.Close() errCh = promise.Go(func() error { - return holdHijackedConnection(ctx, dockerCli, execConfig.Tty, execConfig.DetachKeys, in, out, stderr, resp) + streamer := hijackedIOStreamer{ + streams: dockerCli, + inputStream: in, + outputStream: out, + errorStream: stderr, + resp: resp, + tty: execConfig.Tty, + detachKeys: execConfig.DetachKeys, + } + + return streamer.stream(ctx) }) if execConfig.Tty && dockerCli.In().IsTerminal() { diff --git a/cli/command/container/hijack.go b/cli/command/container/hijack.go index 6fd0dc7cad..c6815a4d7d 100644 --- a/cli/command/container/hijack.go +++ b/cli/command/container/hijack.go @@ -1,6 +1,7 @@ package container import ( + "fmt" "io" "runtime" "sync" @@ -15,108 +16,166 @@ import ( ) // The default escape key sequence: ctrl-p, ctrl-q +// TODO: This could be moved to `pkg/term`. var defaultEscapeKeys = []byte{16, 17} -// holdHijackedConnection handles copying input to and output from streams to the -// connection -// nolint: gocyclo -func holdHijackedConnection(ctx context.Context, streams command.Streams, tty bool, detachKeys string, inputStream io.ReadCloser, outputStream, errorStream io.Writer, resp types.HijackedResponse) error { - var ( - err error - restoreOnce sync.Once - ) - if inputStream != nil && tty { - if err := setRawTerminal(streams); err != nil { - return err - } - defer func() { - restoreOnce.Do(func() { - restoreTerminal(streams, inputStream) - }) - }() +// A hijackedIOStreamer handles copying input to and output from streams to the +// connection. +type hijackedIOStreamer struct { + streams command.Streams + inputStream io.ReadCloser + outputStream io.Writer + errorStream io.Writer - // Wrap the input to detect detach control sequence. - // Use default detach sequence if an invalid sequence is given. - escapeKeys, err := term.ToBytes(detachKeys) - if len(escapeKeys) == 0 || err != nil { - escapeKeys = defaultEscapeKeys - } + resp types.HijackedResponse - inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(inputStream, escapeKeys), inputStream.Close) + tty bool + detachKeys string +} + +// stream handles setting up the IO and then begins streaming stdin/stdout +// to/from the hijacked connection, blocking until it is either done reading +// output, the user inputs the detach key sequence when in TTY mode, or when +// the given context is cancelled. +func (h *hijackedIOStreamer) stream(ctx context.Context) error { + restoreInput, err := h.setupInput() + if err != nil { + return fmt.Errorf("unable to setup input stream: %s", err) } - receiveStdout := make(chan error, 1) - if outputStream != nil || errorStream != nil { - go func() { - // When TTY is ON, use regular copy - if tty && outputStream != nil { - _, err = io.Copy(outputStream, resp.Reader) - // we should restore the terminal as soon as possible once connection end - // so any following print messages will be in normal type. - if inputStream != nil { - restoreOnce.Do(func() { - restoreTerminal(streams, inputStream) - }) - } - } else { - _, err = stdcopy.StdCopy(outputStream, errorStream, resp.Reader) - } + defer restoreInput() - logrus.Debug("[hijack] End of stdout") - receiveStdout <- err - }() - } - - stdinDone := make(chan struct{}) - detachedC := make(chan term.EscapeError) - go func() { - if inputStream != nil { - _, inputErr := io.Copy(resp.Conn, inputStream) - // we should restore the terminal as soon as possible once connection end - // so any following print messages will be in normal type. - if tty { - restoreOnce.Do(func() { - restoreTerminal(streams, inputStream) - }) - } - logrus.Debug("[hijack] End of stdin") - - if detached, ok := inputErr.(term.EscapeError); ok { - detachedC <- detached - return - } - } - - if err := resp.CloseWrite(); err != nil { - logrus.Debugf("Couldn't send EOF: %s", err) - } - close(stdinDone) - }() + outputDone := h.beginOutputStream(restoreInput) + inputDone, detached := h.beginInputStream(restoreInput) select { - case err := <-receiveStdout: - if err != nil { - logrus.Debugf("Error receiveStdout: %s", err) - return err - } - case <-stdinDone: - if outputStream != nil || errorStream != nil { + case err := <-outputDone: + return err + case <-inputDone: + // Input stream has closed. + if h.outputStream != nil || h.errorStream != nil { + // Wait for output to complete streaming. select { - case err := <-receiveStdout: - if err != nil { - logrus.Debugf("Error receiveStdout: %s", err) - return err - } + case err := <-outputDone: + return err case <-ctx.Done(): + return ctx.Err() } } - case err := <-detachedC: + return nil + case err := <-detached: // Got a detach key sequence. return err case <-ctx.Done(): + return ctx.Err() + } +} + +func (h *hijackedIOStreamer) setupInput() (restore func(), err error) { + if h.inputStream == nil || !h.tty { + // No need to setup input TTY. + // The restore func is a nop. + return func() {}, nil } - return nil + if err := setRawTerminal(h.streams); err != nil { + return nil, fmt.Errorf("unable to set IO streams as raw terminal: %s", err) + } + + // Use sync.Once so we may call restore multiple times but ensure we + // only restore the terminal once. + var restoreOnce sync.Once + restore = func() { + restoreOnce.Do(func() { + restoreTerminal(h.streams, h.inputStream) + }) + } + + // Wrap the input to detect detach escape sequence. + // Use default escape keys if an invalid sequence is given. + escapeKeys := defaultEscapeKeys + if h.detachKeys != "" { + customEscapeKeys, err := term.ToBytes(h.detachKeys) + if err != nil { + logrus.Warnf("invalid detach escape keys, using default: %s", err) + } else { + escapeKeys = customEscapeKeys + } + } + + h.inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(h.inputStream, escapeKeys), h.inputStream.Close) + + return restore, nil +} + +func (h *hijackedIOStreamer) beginOutputStream(restoreInput func()) <-chan error { + if h.outputStream == nil && h.errorStream == nil { + // Ther is no need to copy output. + return nil + } + + outputDone := make(chan error) + go func() { + var err error + + // When TTY is ON, use regular copy + if h.outputStream != nil && h.tty { + _, err = io.Copy(h.outputStream, h.resp.Reader) + // We should restore the terminal as soon as possible + // once the connection ends so any following print + // messages will be in normal type. + restoreInput() + } else { + _, err = stdcopy.StdCopy(h.outputStream, h.errorStream, h.resp.Reader) + } + + logrus.Debug("[hijack] End of stdout") + + if err != nil { + logrus.Debugf("Error receiveStdout: %s", err) + } + + outputDone <- err + }() + + return outputDone +} + +func (h *hijackedIOStreamer) beginInputStream(restoreInput func()) (doneC <-chan struct{}, detachedC <-chan error) { + inputDone := make(chan struct{}) + detached := make(chan error) + + go func() { + if h.inputStream != nil { + _, err := io.Copy(h.resp.Conn, h.inputStream) + // We should restore the terminal as soon as possible + // once the connection ends so any following print + // messages will be in normal type. + restoreInput() + + logrus.Debug("[hijack] End of stdin") + + if _, ok := err.(term.EscapeError); ok { + detached <- err + return + } + + if err != nil { + // This error will also occur on the receive + // side (from stdout) where it will be + // propogated back to the caller. + logrus.Debugf("Error sendStdin: %s", err) + } + } + + if err := h.resp.CloseWrite(); err != nil { + logrus.Debugf("Couldn't send EOF: %s", err) + } + + close(inputDone) + }() + + return inputDone, detached } func setRawTerminal(streams command.Streams) error { diff --git a/cli/command/container/run.go b/cli/command/container/run.go index 5cc92ce21f..722ad22e69 100644 --- a/cli/command/container/run.go +++ b/cli/command/container/run.go @@ -176,8 +176,8 @@ func runContainer(dockerCli *command.DockerCli, opts *runOptions, copts *contain //start the container if err := client.ContainerStart(ctx, createResponse.ID, types.ContainerStartOptions{}); err != nil { - // If we have holdHijackedConnection, we should notify - // holdHijackedConnection we are going to exit and wait + // If we have hijackedIOStreamer, we should notify + // hijackedIOStreamer we are going to exit and wait // to avoid the terminal are not restored. if attach { cancelFun() @@ -267,7 +267,17 @@ func attachContainer( } *errCh = promise.Go(func() error { - if errHijack := holdHijackedConnection(ctx, dockerCli, config.Tty, options.DetachKeys, in, out, cerr, resp); errHijack != nil { + streamer := hijackedIOStreamer{ + streams: dockerCli, + inputStream: in, + outputStream: out, + errorStream: cerr, + resp: resp, + tty: config.Tty, + detachKeys: options.DetachKeys, + } + + if errHijack := streamer.stream(ctx); errHijack != nil { return errHijack } return errAttach diff --git a/cli/command/container/start.go b/cli/command/container/start.go index 54ec8344f7..af8255de8b 100644 --- a/cli/command/container/start.go +++ b/cli/command/container/start.go @@ -104,7 +104,17 @@ func runStart(dockerCli *command.DockerCli, opts *startOptions) error { } defer resp.Close() cErr := promise.Go(func() error { - errHijack := holdHijackedConnection(ctx, dockerCli, c.Config.Tty, options.DetachKeys, in, dockerCli.Out(), dockerCli.Err(), resp) + streamer := hijackedIOStreamer{ + streams: dockerCli, + inputStream: in, + outputStream: dockerCli.Out(), + errorStream: dockerCli.Err(), + resp: resp, + tty: c.Config.Tty, + detachKeys: options.DetachKeys, + } + + errHijack := streamer.stream(ctx) if errHijack == nil { return errAttach }