diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-08 13:11:59 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-21 15:20:39 -0600 |
| commit | 2ddcc34ca455973598f5693d64103deea41d8d79 (patch) | |
| tree | 0b3a42aa97bca93c15c67a679c903611e5ab60c1 /vendor/github.com/testcontainers/testcontainers-go/docker.go | |
| parent | 16c27cd885b9c0d1241dfead3120643f0e8c556c (diff) | |
chore: use minit to start processes from Procfile
Diffstat (limited to 'vendor/github.com/testcontainers/testcontainers-go/docker.go')
| -rw-r--r-- | vendor/github.com/testcontainers/testcontainers-go/docker.go | 139 |
1 files changed, 83 insertions, 56 deletions
diff --git a/vendor/github.com/testcontainers/testcontainers-go/docker.go b/vendor/github.com/testcontainers/testcontainers-go/docker.go index c578796..e20026c 100644 --- a/vendor/github.com/testcontainers/testcontainers-go/docker.go +++ b/vendor/github.com/testcontainers/testcontainers-go/docker.go @@ -5,6 +5,7 @@ import ( "bufio" "context" "encoding/base64" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -15,18 +16,19 @@ import ( "os" "path/filepath" "regexp" + "slices" "sync" "time" "github.com/cenkalti/backoff/v4" + "github.com/containerd/errdefs" "github.com/containerd/platforms" - "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/build" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" - "github.com/docker/docker/errdefs" "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/stdcopy" "github.com/docker/go-connections/nat" @@ -79,6 +81,7 @@ type DockerContainer struct { provider *DockerProvider sessionID string terminationSignal chan bool + consumersMtx sync.Mutex // protects consumers consumers []LogConsumer // TODO: Remove locking and wait group once the deprecated StartLogProducer and @@ -139,7 +142,8 @@ func (c *DockerContainer) Endpoint(ctx context.Context, proto string) (string, e } // PortEndpoint gets proto://host:port string for the given exposed port -// Will returns just host:port if proto is "" +// It returns proto://host:port or proto://[IPv6host]:port string for the given exposed port. +// It returns just host:port or [IPv6host]:port if proto is blank. func (c *DockerContainer) PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) { host, err := c.Host(ctx) if err != nil { @@ -151,12 +155,12 @@ func (c *DockerContainer) PortEndpoint(ctx context.Context, port nat.Port, proto return "", err } - protoFull := "" - if proto != "" { - protoFull = proto + "://" + hostPort := net.JoinHostPort(host, outerPort.Port()) + if proto == "" { + return hostPort, nil } - return fmt.Sprintf("%s%s:%s", protoFull, host, outerPort.Port()), nil + return proto + "://" + hostPort, nil } // Host gets host (ip or name) of the docker daemon where the container port is exposed @@ -205,7 +209,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po return nat.NewPort(k.Proto(), p[0].HostPort) } - return "", errdefs.NotFound(fmt.Errorf("port %q not found", port)) + return "", errdefs.ErrNotFound.WithMessage(fmt.Sprintf("port %q not found", port)) } // Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead. @@ -364,8 +368,6 @@ func (c *DockerContainer) inspectRawContainer(ctx context.Context) (*container.I // Logs will fetch both STDOUT and STDERR from the current container. Returns a // ReadCloser and leaves it up to the caller to extract what it wants. func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) { - const streamHeaderSize = 8 - options := container.LogsOptions{ ShowStdout: true, ShowStderr: true, @@ -377,42 +379,43 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) { } defer c.provider.Close() + resp, err := c.Inspect(ctx) + if err != nil { + return nil, err + } + + if resp.Config.Tty { + return rc, nil + } + + return c.parseMultiplexedLogs(rc), nil +} + +// parseMultiplexedLogs handles the multiplexed log format used when TTY is disabled +func (c *DockerContainer) parseMultiplexedLogs(rc io.ReadCloser) io.ReadCloser { + const streamHeaderSize = 8 + pr, pw := io.Pipe() r := bufio.NewReader(rc) go func() { - lineStarted := true - for err == nil { - line, isPrefix, err := r.ReadLine() - - if lineStarted && len(line) >= streamHeaderSize { - line = line[streamHeaderSize:] // trim stream header - lineStarted = false - } - if !isPrefix { - lineStarted = true - } - - _, errW := pw.Write(line) - if errW != nil { + header := make([]byte, streamHeaderSize) + for { + _, errH := io.ReadFull(r, header) + if errH != nil { + _ = pw.CloseWithError(errH) return } - if !isPrefix { - _, errW := pw.Write([]byte("\n")) - if errW != nil { - return - } - } - - if err != nil { - _ = pw.CloseWithError(err) + frameSize := binary.BigEndian.Uint32(header[4:]) + if _, err := io.CopyN(pw, r, int64(frameSize)); err != nil { + pw.CloseWithError(err) return } } }() - return pr, nil + return pr } // Deprecated: use the ContainerRequest.LogConsumerConfig field instead. @@ -423,9 +426,29 @@ func (c *DockerContainer) FollowOutput(consumer LogConsumer) { // followOutput adds a LogConsumer to be sent logs from the container's // STDOUT and STDERR func (c *DockerContainer) followOutput(consumer LogConsumer) { + c.consumersMtx.Lock() + defer c.consumersMtx.Unlock() + c.consumers = append(c.consumers, consumer) } +// consumersCopy returns a copy of the current consumers. +func (c *DockerContainer) consumersCopy() []LogConsumer { + c.consumersMtx.Lock() + defer c.consumersMtx.Unlock() + + return slices.Clone(c.consumers) +} + +// resetConsumers resets the current consumers to the provided ones. +func (c *DockerContainer) resetConsumers(consumers []LogConsumer) { + c.consumersMtx.Lock() + defer c.consumersMtx.Unlock() + + c.consumers = c.consumers[:0] + c.consumers = append(c.consumers, consumers...) +} + // Deprecated: use c.Inspect(ctx).Name instead. // Name gets the name of the container. func (c *DockerContainer) Name(ctx context.Context) (string, error) { @@ -760,8 +783,10 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro } // Setup the log writers. - stdout := newLogConsumerWriter(StdoutLog, c.consumers) - stderr := newLogConsumerWriter(StderrLog, c.consumers) + + consumers := c.consumersCopy() + stdout := newLogConsumerWriter(StdoutLog, consumers) + stderr := newLogConsumerWriter(StderrLog, consumers) // Setup the log production context which will be used to stop the log production. c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx) @@ -977,22 +1002,22 @@ var _ ContainerProvider = (*DockerProvider)(nil) // BuildImage will build and image from context and Dockerfile, then return the tag func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (string, error) { - var buildOptions types.ImageBuildOptions + var buildOptions build.ImageBuildOptions resp, err := backoff.RetryNotifyWithData( - func() (types.ImageBuildResponse, error) { + func() (build.ImageBuildResponse, error) { var err error buildOptions, err = img.BuildOptions() if err != nil { - return types.ImageBuildResponse{}, backoff.Permanent(fmt.Errorf("build options: %w", err)) + return build.ImageBuildResponse{}, backoff.Permanent(fmt.Errorf("build options: %w", err)) } defer tryClose(buildOptions.Context) // release resources in any case resp, err := p.client.ImageBuild(ctx, buildOptions.Context, buildOptions) if err != nil { if isPermanentClientError(err) { - return types.ImageBuildResponse{}, backoff.Permanent(fmt.Errorf("build image: %w", err)) + return build.ImageBuildResponse{}, backoff.Permanent(fmt.Errorf("build image: %w", err)) } - return types.ImageBuildResponse{}, err + return build.ImageBuildResponse{}, err } defer p.Close() @@ -1037,13 +1062,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque // as container won't be attached to it automatically // in case of Podman the bridge network is called 'podman' as 'bridge' would conflict if defaultNetwork != p.defaultBridgeNetworkName { - isAttached := false - for _, net := range req.Networks { - if net == defaultNetwork { - isAttached = true - break - } - } + isAttached := slices.Contains(req.Networks, defaultNetwork) if !isAttached { req.Networks = append(req.Networks, defaultNetwork) @@ -1121,7 +1140,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } else { img, err := p.client.ImageInspect(ctx, imageName) if err != nil { - if !client.IsErrNotFound(err) { + if !errdefs.IsNotFound(err) { return nil, err } shouldPullImage = true @@ -1290,7 +1309,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) } if c == nil { - return nil, errdefs.NotFound(fmt.Errorf("container %s not found", name)) + return nil, errdefs.ErrNotFound.WithMessage(fmt.Sprintf("container %s not found", name)) } return c, nil }, @@ -1364,11 +1383,19 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain lifecycleHooks: []ContainerLifecycleHooks{combineContainerHooks(defaultHooks, req.LifecycleHooks)}, } + // Workaround for https://github.com/moby/moby/issues/50133. + // /containers/{id}/json API endpoint of Docker Engine takes data about container from master (not replica) database + // which is synchronized with container state after call of /containers/{id}/stop API endpoint. + dcState, err := dc.State(ctx) + if err != nil { + return nil, fmt.Errorf("docker container state: %w", err) + } + // If a container was stopped programmatically, we want to ensure the container // is running again, but only if it is not paused, as it's not possible to start // a paused container. The Docker Engine returns the "cannot start a paused container, // try unpause instead" error. - switch c.State { + switch dcState.Status { case "running": // cannot re-start a running container, but we still need // to call the startup hooks. @@ -1401,7 +1428,7 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain func (p *DockerProvider) attemptToPullImage(ctx context.Context, tag string, pullOpt image.PullOptions) error { registry, imageAuth, err := DockerImageAuth(ctx, tag) if err != nil { - p.Logger.Printf("Failed to get image auth for %s. Setting empty credentials for the image: %s. Error is: %s", registry, tag, err) + p.Logger.Printf("No image auth found for %s. Setting empty credentials for the image: %s. This is expected for public images. Details: %s", registry, tag, err) } else { // see https://github.com/docker/docs/blob/e8e1204f914767128814dca0ea008644709c117f/engine/api/sdk/examples.md?plain=1#L649-L657 encodedJSON, err := json.Marshal(imageAuth) @@ -1437,7 +1464,7 @@ func (p *DockerProvider) attemptToPullImage(ctx context.Context, tag string, pul defer pull.Close() // download of docker image finishes at EOF of the pull request - _, err = io.ReadAll(pull) + _, err = io.Copy(io.Discard, pull) return err } @@ -1791,11 +1818,11 @@ func (p *DockerProvider) PullImage(ctx context.Context, img string) error { var permanentClientErrors = []func(error) bool{ errdefs.IsNotFound, - errdefs.IsInvalidParameter, + errdefs.IsInvalidArgument, errdefs.IsUnauthorized, - errdefs.IsForbidden, + errdefs.IsPermissionDenied, errdefs.IsNotImplemented, - errdefs.IsSystem, + errdefs.IsInternal, } func isPermanentClientError(err error) bool { |
