summaryrefslogtreecommitdiff
path: root/vendor/github.com/testcontainers/testcontainers-go/parallel.go
blob: 0349023ba21a32324963dedcf0eca513454c5846 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package testcontainers

import (
	"context"
	"fmt"
	"sync"
)

const (
	defaultWorkersCount = 8
)

type ParallelContainerRequest []GenericContainerRequest

// ParallelContainersOptions represents additional options for parallel running
type ParallelContainersOptions struct {
	WorkersCount int // count of parallel workers. If field empty(zero), default value will be 'defaultWorkersCount'
}

// ParallelContainersRequestError represents error from parallel request
type ParallelContainersRequestError struct {
	Request GenericContainerRequest
	Error   error
}

type ParallelContainersError struct {
	Errors []ParallelContainersRequestError
}

func (gpe ParallelContainersError) Error() string {
	return fmt.Sprintf("%v", gpe.Errors)
}

// parallelContainersResult represents result.
type parallelContainersResult struct {
	ParallelContainersRequestError
	Container Container
}

func parallelContainersRunner(
	ctx context.Context,
	requests <-chan GenericContainerRequest,
	results chan<- parallelContainersResult,
	wg *sync.WaitGroup,
) {
	defer wg.Done()
	for req := range requests {
		c, err := GenericContainer(ctx, req)
		res := parallelContainersResult{Container: c}
		if err != nil {
			res.Request = req
			res.Error = err
		}
		results <- res
	}
}

// ParallelContainers creates a generic containers with parameters and run it in parallel mode
func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt ParallelContainersOptions) ([]Container, error) {
	if opt.WorkersCount == 0 {
		opt.WorkersCount = defaultWorkersCount
	}

	tasksChanSize := opt.WorkersCount
	if tasksChanSize > len(reqs) {
		tasksChanSize = len(reqs)
	}

	tasksChan := make(chan GenericContainerRequest, tasksChanSize)
	resultsChan := make(chan parallelContainersResult, tasksChanSize)
	done := make(chan struct{})

	var wg sync.WaitGroup
	wg.Add(tasksChanSize)

	// run workers
	for i := 0; i < tasksChanSize; i++ {
		go parallelContainersRunner(ctx, tasksChan, resultsChan, &wg)
	}

	var errs []ParallelContainersRequestError
	containers := make([]Container, 0, len(reqs))
	go func() {
		defer close(done)
		for res := range resultsChan {
			if res.Error != nil {
				errs = append(errs, res.ParallelContainersRequestError)
			} else {
				containers = append(containers, res.Container)
			}
		}
	}()

	for _, req := range reqs {
		tasksChan <- req
	}
	close(tasksChan)

	wg.Wait()

	close(resultsChan)

	<-done

	if len(errs) != 0 {
		return containers, ParallelContainersError{Errors: errs}
	}

	return containers, nil
}