-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync.go
More file actions
127 lines (108 loc) · 2.37 KB
/
async.go
File metadata and controls
127 lines (108 loc) · 2.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package async
import (
"context"
"sync"
)
// Task is a function that can be run concurrently.
type Task func() error
// Run will execute the given tasks concurrently and return any errors.
func Run(tasks ...Task) <-chan error {
errc := make(chan error)
// run tasks
var wg sync.WaitGroup
for _, v := range tasks {
wg.Add(1)
go func(task Task) {
defer wg.Done()
err := task()
if err != nil {
errc <- err
}
}(v)
}
// make sure to close error channel
go func() {
wg.Wait()
close(errc)
}()
return errc
}
// RunForever will execute the given task repeatedly on a set number of goroutines and return any errors. Context can be used to cancel execution of additional tasks.
func RunForever(ctx context.Context, concurrent int, task Task) <-chan error {
errc := make(chan error)
// run tasks
var wg sync.WaitGroup
for c := 0; c < concurrent; c++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
err := task()
if err != nil {
errc <- err
}
select {
case <-ctx.Done():
errc <- ctx.Err()
return
default:
}
}
}()
}
// make sure to close error channel
go func() {
wg.Wait()
close(errc)
}()
return errc
}
// RunLimited will execute the given task a set number of times on a set number of goroutines and return any errors. Total times the task will be executed is equal to concurrent multiplied by count. Context can be used to cancel execution of additional tasks.
func RunLimited(ctx context.Context, concurrent int, count int, task Task) <-chan error {
errc := make(chan error)
// run tasks
var wg sync.WaitGroup
for c := 0; c < concurrent; c++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
err := task()
if err != nil {
errc <- err
}
select {
case <-ctx.Done():
errc <- ctx.Err()
return
default:
}
}
}()
}
// make sure to close error channel
go func() {
wg.Wait()
close(errc)
}()
return errc
}
// Wait until channel is closed or error is received.
func Wait(errc <-chan error) error {
for err := range errc {
if err != nil {
return err
}
}
return nil
}
// HandleError sets a handler function to be called anytime an error is received on the given channel.
func HandleError(errc <-chan error, handler func(error)) {
go func() {
for err := range errc {
if err != nil {
handler(err)
}
}
}()
}