summaryrefslogtreecommitdiff
path: root/vendor/github.com/testcontainers/testcontainers-go/parallel.go
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-05-11 21:12:57 -0600
committermo khan <mo@mokhan.ca>2025-05-11 21:12:57 -0600
commit60440f90dca28e99a31dd328c5f6d5dc0f9b6a2e (patch)
tree2f54adf55086516f162f0a55a5347e6b25f7f176 /vendor/github.com/testcontainers/testcontainers-go/parallel.go
parent05ca9b8d3a9c7203a3a3b590beaa400900bd9007 (diff)
chore: vendor go dependencies
Diffstat (limited to 'vendor/github.com/testcontainers/testcontainers-go/parallel.go')
-rw-r--r--vendor/github.com/testcontainers/testcontainers-go/parallel.go110
1 files changed, 110 insertions, 0 deletions
diff --git a/vendor/github.com/testcontainers/testcontainers-go/parallel.go b/vendor/github.com/testcontainers/testcontainers-go/parallel.go
new file mode 100644
index 0000000..0349023
--- /dev/null
+++ b/vendor/github.com/testcontainers/testcontainers-go/parallel.go
@@ -0,0 +1,110 @@
+package testcontainers
+
+import (
+ "context"
+ "fmt"
+ "sync"
+)
+
+const (
+ defaultWorkersCount = 8
+)
+
+type ParallelContainerRequest []GenericContainerRequest
+
+// ParallelContainersOptions represents additional options for parallel running
+type ParallelContainersOptions struct {
+ WorkersCount int // count of parallel workers. If field empty(zero), default value will be 'defaultWorkersCount'
+}
+
+// ParallelContainersRequestError represents error from parallel request
+type ParallelContainersRequestError struct {
+ Request GenericContainerRequest
+ Error error
+}
+
+type ParallelContainersError struct {
+ Errors []ParallelContainersRequestError
+}
+
+func (gpe ParallelContainersError) Error() string {
+ return fmt.Sprintf("%v", gpe.Errors)
+}
+
+// parallelContainersResult represents result.
+type parallelContainersResult struct {
+ ParallelContainersRequestError
+ Container Container
+}
+
+func parallelContainersRunner(
+ ctx context.Context,
+ requests <-chan GenericContainerRequest,
+ results chan<- parallelContainersResult,
+ wg *sync.WaitGroup,
+) {
+ defer wg.Done()
+ for req := range requests {
+ c, err := GenericContainer(ctx, req)
+ res := parallelContainersResult{Container: c}
+ if err != nil {
+ res.Request = req
+ res.Error = err
+ }
+ results <- res
+ }
+}
+
+// ParallelContainers creates a generic containers with parameters and run it in parallel mode
+func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt ParallelContainersOptions) ([]Container, error) {
+ if opt.WorkersCount == 0 {
+ opt.WorkersCount = defaultWorkersCount
+ }
+
+ tasksChanSize := opt.WorkersCount
+ if tasksChanSize > len(reqs) {
+ tasksChanSize = len(reqs)
+ }
+
+ tasksChan := make(chan GenericContainerRequest, tasksChanSize)
+ resultsChan := make(chan parallelContainersResult, tasksChanSize)
+ done := make(chan struct{})
+
+ var wg sync.WaitGroup
+ wg.Add(tasksChanSize)
+
+ // run workers
+ for i := 0; i < tasksChanSize; i++ {
+ go parallelContainersRunner(ctx, tasksChan, resultsChan, &wg)
+ }
+
+ var errs []ParallelContainersRequestError
+ containers := make([]Container, 0, len(reqs))
+ go func() {
+ defer close(done)
+ for res := range resultsChan {
+ if res.Error != nil {
+ errs = append(errs, res.ParallelContainersRequestError)
+ } else {
+ containers = append(containers, res.Container)
+ }
+ }
+ }()
+
+ for _, req := range reqs {
+ tasksChan <- req
+ }
+ close(tasksChan)
+
+ wg.Wait()
+
+ close(resultsChan)
+
+ <-done
+
+ if len(errs) != 0 {
+ return containers, ParallelContainersError{Errors: errs}
+ }
+
+ return containers, nil
+}