kotlin coroutines - use main thread in run blocking
I tested the solution with Java 8 parallel streams:
jobs.parallelStream().forEach { it.execute() }
I found the CPU utilization to be reliably on 100%. For reference, I used this computation job:
class MyJob {
fun execute(): Double {
val rnd = ThreadLocalRandom.current()
var d = 1.0
(1..rnd.nextInt(1_000_000)).forEach { _ ->
d *= 1 + rnd.nextDouble(0.0000001)
}
return d
}
}
Note that its duration varies randomly from zero up to the time it takes to perform 100,000,000 FP multiplications.
Out of curiosity I also studied the code you added to your question as the solution that works for you. I found a number of issues with it, such as:
- accumulating all the results into a list instead of processing them as they become available
- closing the result channel immediately after submitting the last job instead of waiting for all the results
I wrote some code of my own and added code to benchmark the Stream API one-liner against it. Here it is:
const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }
fun parallelStream(): Double =
jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })
fun channels(): Double {
val resultChannel = Channel<Double>(UNLIMITED)
val mainComputeChannel = Channel<MyJob>()
val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
GlobalScope.actor<MyJob>(Dispatchers.Default) {
for (job in channel) {
job.execute().also { resultChannel.send(it) }
}
}
}
val allComputeChannels = poolComputeChannels + mainComputeChannel
// Launch a coroutine that submits the jobs
GlobalScope.launch {
jobs.forEach { job ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(job) {}
}
}
}
}
// Run the main loop which takes turns between running a job
// submitted to the main thread channel and receiving a result
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_JOBS) {
select<Unit> {
mainComputeChannel.onReceive { job ->
job.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
sum
}
}
fun main(args: Array<String>) {
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
}
fun measure(task: String, measuredCode: () -> Double) {
val block = { print(measuredCode().toString().substringBefore('.')) }
println("Warming up $task")
(1..20).forEach { _ -> block() }
println("\nMeasuring $task")
val average = (1..20).map { measureTimeMillis(block) }.average()
println("\n$task took $average ms")
}
Here's my typical result:
Parallel Stream took 396.85 ms
Channels took 398.1 ms
The results are similar, but one line of code still beats 50 lines of code :)