Why this Scala code execute two Futures in one thread?
expand it to Promise
is easily to undunderstand
val p1 = Promise[Future[Int]]
ec.execute(() => {
// the fist task is start run
val p2 = Promise[Int]
//the second task is submit , but no run
ec.execute(() => {
p2.complete(Success(1))
println(s"task 2 -> p1:${p1},p2:${p2}")
})
//here the p1 is completed, not wait p2.future finish
p1.complete(Success(p2.future))
println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
//first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future
Thread.sleep(1000)
println(result)
The reason that your code is working, is that both futures will be executed by the same thread. The ExecutionContext
that you are creating will not use a Thread
directly for each Future
but will instead schedule tasks (Runnable
instances) to be executed. In case no more threads are available in the pool these tasks will be put into a BlockingQueue
waiting to be executed. (See ThreadPoolExecutor API for details)
If you look at the implementation of Executors.newFixedThreadPool(1)
you'll see that creates an Executor with an unbounded queue:
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
To get the effect of thread-starvation that you were looking for, you could create an executor with a limited queue yourself:
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
Since the minimal capacity of ArrayBlockingQueue
is 1 you would need three futures to reach the limit, and you would also need to add some code to be executed on the result of the future, to keep them from completing (in the example below I do this by adding .map(identity)
)
The following example
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
def addOne(x: Int) = Future {
x + 1
}
def addTwo(x: Int) = Future {
addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
addTwo(x + 1).map(identity)
}
println(addThree(1))
fails with
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]