Using goroutines to process values and gather results into a slice
There are two problems
- Using unbuffered channels: Unbuffered channels block receivers until data is available on the channel and senders until a receiver is available.That caused the error
- Not closing the channel before range: As you never close the ch channel, the range loop will never finish.
You have to use a buffered
channel and close
the channel before range
Code
package main
import (
"fmt"
"sync"
)
func double(line int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- line * 2
}
func main() {
contents := []int{1, 2, 3, 4, 5}
sampleChan := make(chan int,len(contents))
var wg sync.WaitGroup
// Read from contents list
for _, line := range contents {
wg.Add(1)
go double(line, sampleChan, &wg)
}
wg.Wait()
close(sampleChan)
// Read from sampleChan and put into a slice
var sampleList []int
for s := range sampleChan {
sampleList = append(sampleList, s)
}
fmt.Println(sampleList)
}
Play link : https://play.golang.org/p/k03vt3hd3P
EDIT:
Another approach for better performance would be to run producer
and consumer
at concurrently
Modified code
package main
import (
"fmt"
"sync"
)
func doubleLines(lines []int, wg *sync.WaitGroup, sampleChan chan int) {
defer wg.Done()
defer close(sampleChan)
var w sync.WaitGroup
for _, line := range lines {
w.Add(1)
go double(&w, line, sampleChan)
}
w.Wait()
}
func double(wg *sync.WaitGroup, line int, ch chan int) {
defer wg.Done()
ch <- line * 2
}
func collectResult(wg *sync.WaitGroup, channel chan int, sampleList *[]int) {
defer wg.Done()
for s := range channel {
*sampleList = append(*sampleList, s)
}
}
func main() {
contents := []int{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
sampleChan := make(chan int, 1)
var sampleList []int
var wg sync.WaitGroup
wg.Add(1)
go doubleLines(contents, &wg, sampleChan)
wg.Add(1)
go collectResult(&wg, sampleChan, &sampleList)
wg.Wait()
fmt.Println(sampleList)
}
play link: https://play.golang.org/p/VAe7Qll3iVM
Your code is almost correct. There's a couple of problems: first, you're waiting for all the workers to finish before collecting the results, and second your for
loop terminates when the channel is closed, but the channel is closed only after the for
loop terminates.
You can fix the code by asynchronously closing the channel when the workers are finished:
for i, line := range contents {
wg.Add(1)
// Process each item with a goroutine and send output to sampleChan
go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
go func() {
wg.Wait()
close(sampleChan)
}()
for s := range sampleChan {
..
}
As a note of style (and following https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions), it'd be preferable if newSample
was a simple, synchronous function that didn't take the waitgroup and channel, and simply generated its result. Then the worker code would look like:
for i, line := range contents {
wg.Add(1)
go func(line string) {
defer wg.Done()
sampleChan <- newSample(line, *replicatePtr, *timePtr)
}(line)
}
This keeps your concurrency primitives all together, which apart from simplifiying newSample
and making it easier to test, it allows you to see what's going on with the concurrency, and visually check that wg.Done()
is always called. And if you want to refactor the code to for example use a fixed number of workers, then your changes will all be local.