Close multiple goroutine if an error occurs in one in go
You may use the context
package which was created for things like this ("carries deadlines, cancelation signals...").
You create a context capable of publishing cancelation signals with context.WithCancel()
(parent context may be the one returned by context.Background()
). This will return you a cancel()
function which may be used to cancel (or more precisely signal the cancel intent) to the worker goroutines.
And in the worker goroutines you have to check if such intent has been initiated, by checking if the channel returned by Context.Done()
is closed, easiest done by attempting to receive from it (which proceeds immediately if it is closed). And to do a non-blocking check (so you can continue if it is not closed), use the select
statement with a default
branch.
I will use the following work()
implementation, which simulates a 10% failure chance, and simulates 1 second of work:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
And the doAllWork()
may look like this:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
This is how it can be tested:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
Output (try it on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
If there would be no errors, e.g. when using the following work()
function:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
The output would be like (try it on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
Notes:
Basically we just used the Done()
channel of the context, so it seems we could just as easily (if not even easier) use a done
channel instead of the Context
, closing the channel to do what cancel()
does in the above solution.
This is not true. This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so. And attempting to close an already closed channel panics (see details here: How does a non initialized channel behave?). So you would have to ensure some kind of synchronization / exclusion around the close(done)
, which will make it less readable and even more complex. Actually this is exactly what the cancel()
function does under the hood, hidden / abstracted away from your eyes, so cancel()
may be called multiple times to make your code / use of it simpler.
How to get and return the error(s) from the workers?
For this you may use an error channel:
errs := make(chan error, 2) // Buffer for 2 errors
And inside the workers when an error is encountered, send it on the channel instead of printing it:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
And after the loop, if there was an error, return that (and nil
otherwise):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
Output this time (try this on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
Note that I used a buffered channel with a buffer size equal to the number of workers, which ensures sending on it is always non-blocking. This also gives you the possibility to receive and process all errors, not just one (e.g. the first). Another option could be to use a buffered channel to hold only 1, and do a non-blocking send on it, which could look like this:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}
A more clear way to go here is to use errgroup
(documentation).
Package errgroup
provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
You can check it out in this example (playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
// After all have run, at least one of them has returned an error!
// But all have to finish their work!
// If you want to stop others goroutines when one fail, go ahead reading!
fmt.Println("Unsuccessfully fetched URLs.")
}
But attention: The first call to return a non-nil error cancels the group
phrase in the Go
documentation is a little bit misleading.
In fact, errgroup.Group
if created with a context (WithContext
function), will call the cancel function of the context returned by WithContext
when a goroutine in the group will return an error, otherwise nothing will be done (read the source code here!).
So, if you want to close your different goroutines, you must use the context returned my WithContext
and manage it by yourself inside them, errgroup
will just close that context!
Here you can find an example.
To summarize, errgroup
can be used in different ways, as shown by the examples.
"just errors", as the above example:
Wait
wait that all goroutines end, and then returns the first non-nil error if any from them, or returnnil
.In parallel: You have to create the group with the
WithContext
function and use the context to manage the context closing. I created a playground example here with some sleeps! You have to manually close each goroutines, but using the context you can end them when one close the context.Pipelines (see more in the examples).