Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 43 additions & 60 deletions pkg/compose/attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"strings"

"github.com/compose-spec/compose-go/v2/types"
"github.com/docker/cli/cli/streams"
containerType "github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
"github.com/moby/term"
"github.com/sirupsen/logrus"

"github.com/docker/compose/v5/pkg/api"
"github.com/docker/compose/v5/pkg/utils"
Expand All @@ -49,15 +48,18 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
names = append(names, getContainerNameWithoutProject(c))
}

_, _ = fmt.Fprintf(s.stdout(), "Attaching to %s\n", strings.Join(names, ", "))
_, err = fmt.Fprintf(s.stdout(), "Attaching to %s\n", strings.Join(names, ", "))
if err != nil {
logrus.Debugf("failed to write attach message: %v", err)
}

for _, ctr := range containers {
err := s.attachContainer(ctx, ctr, listener)
if err != nil {
return nil, err
}
}
return containers, err
return containers, nil
}

func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error {
Expand Down Expand Up @@ -91,78 +93,59 @@ func (s *composeService) doAttachContainer(ctx context.Context, service, id, nam
})
})

_, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr)
return err
}

func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (func(), chan bool, error) {
detached := make(chan bool)
restore := func() { /* noop */ }
if stdin != nil {
in := streams.NewIn(stdin)
if in.IsTerminal() {
state, err := term.SetRawTerminal(in.FD())
if err != nil {
return restore, detached, err
}
restore = func() {
term.RestoreTerminal(in.FD(), state) //nolint:errcheck
}
}
}

streamIn, streamOut, err := s.getContainerStreams(ctx, container)
err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, wOut, wErr)
if err != nil {
return restore, detached, err
return err
}

go func() {
<-ctx.Done()
if stdin != nil {
stdin.Close() //nolint:errcheck
}
}()
return nil
}

if streamIn != nil && stdin != nil {
go func() {
_, err := io.Copy(streamIn, stdin)
var escapeErr term.EscapeError
if errors.As(err, &escapeErr) {
close(detached)
}
}()
func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdout, stderr io.WriteCloser) error {
streamOut, err := s.getContainerStreams(ctx, container)
if err != nil {
return err
}

if stdout != nil {
go func() {
defer stdout.Close() //nolint:errcheck
defer stderr.Close() //nolint:errcheck
defer streamOut.Close() //nolint:errcheck
defer func() {
if err := stdout.Close(); err != nil {
logrus.Debugf("failed to close stdout: %v", err)
}
if err := stderr.Close(); err != nil {
logrus.Debugf("failed to close stderr: %v", err)
}
if err := streamOut.Close(); err != nil {
logrus.Debugf("failed to close stream output: %v", err)
}
}()

var err error
if tty {
io.Copy(stdout, streamOut) //nolint:errcheck
_, err = io.Copy(stdout, streamOut)
} else {
stdcopy.StdCopy(stdout, stderr, streamOut) //nolint:errcheck
_, err = stdcopy.StdCopy(stdout, stderr, streamOut)
}
if err != nil && !errors.Is(err, io.EOF) {
logrus.Debugf("stream copy error for container %s: %v", container, err)
}
}()
}
return restore, detached, nil
return nil
}

func (s *composeService) getContainerStreams(ctx context.Context, container string) (io.WriteCloser, io.ReadCloser, error) {
var stdout io.ReadCloser
var stdin io.WriteCloser
func (s *composeService) getContainerStreams(ctx context.Context, container string) (io.ReadCloser, error) {
cnx, err := s.apiClient().ContainerAttach(ctx, container, containerType.AttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
Logs: false,
DetachKeys: s.configFile().DetachKeys,
Stream: true,
Stdin: false,
Stdout: true,
Stderr: true,
Logs: false,
})
if err == nil {
stdout = ContainerStdout{HijackedResponse: cnx}
stdin = ContainerStdin{HijackedResponse: cnx}
return stdin, stdout, nil
stdout := ContainerStdout{HijackedResponse: cnx}
return stdout, nil
}

// Fallback to logs API
Expand All @@ -172,7 +155,7 @@ func (s *composeService) getContainerStreams(ctx context.Context, container stri
Follow: true,
})
if err != nil {
return nil, nil, err
return nil, err
}
return stdin, logs, nil
return logs, nil
}