Why is incrementing a number in 10 Java threads not resulting in a value of 10?
Two reasons:
You're not waiting for the threads to finish, you're just shutting down the thread pool (that is: causing the thread pool to reject new tasks but continue to process existing tasks).
You're not establishing a happens-before relationship between the writes in the thread pool and the read in the main thread.
You could do this by (amongst other methods):
- Acquiring the semaphore before reading
a
; - By using
submit
instead ofexecute
to get aFuture<?>
for each of the tasks submitted, and invoking theFuture.get()
method on all of the returned futures. It is documented in the Javadoc ofExecutorService
that this establishes a happens-before.
- Acquiring the semaphore before reading
The first point is the "main" reason why a
is coming out as zero: if I run it locally, and wait for the the thread pool to terminate, a
comes out to 10.
However, just because it comes out as 10 doesn't mean the code works correctly without paying attention to the second point: you need to apply the Java Memory Model to have guarantees of correct functioning.
Issues
Visibility - Multiple threads are accessing the same variable and the code does not have any visibility guarantees
volatile
can help with visibility guaranteeAtomicity - Multiple threads are updating through
a++
orb++
operations. These are not atomic operations. This is primarily set of operations1. fetch a. 2. increment a. 3. update a
. A context switch can happen in any of these states and result in incorrect value.So
volatile
visibility alone is not enough for correctnessUse
AtomicInteger
to guarantee atomicity of the increment operationAtomicXXX
can guarantee atomicity of a single operationIf there was a need to increment both
a
andb
together, then some form of synchronization is neededCommunication - This is not communication between the main thread and executor task threads to communicate completion events
executorService.shutdown()
will not ensure this communicationLatch
can be used for this communicationOr as mentioned by Andy,
Future
can be used
An example code with AtomicInteger
and Latch
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class DemoApplicationTests {
final AtomicInteger a = new AtomicInteger(0);
final AtomicInteger b = new AtomicInteger(0);
void contextLoads() throws Exception {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(1);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
add();
bdd();
latch.countDown();
});
}
latch.await();
executorService.shutdown();
System.out.println("The final value of a:" + a);
System.out.println("The final value of b:" + b);
}
public void add() {
a.incrementAndGet();
}
public void bdd() {
b.incrementAndGet();
}
public static void main(String[] args) throws Exception {
new DemoApplicationTests().contextLoads();
}
}
An incorrect solution with threadpool size > 1
and CompletableFuture
due to race conditions in a++
, b++
.
The following can(my knowledge is limited and can't confirm either way) be a perfectly legal code for a thread pool size of 1
(copied from Eugene's answer)
But when the same code was executed with thread pool size > 1
, it will result in race conditions. (again the intention is to discuss about multiple threads and data visibility issues as is and not to project Eugene's answer as incorrect. Eugene's answer is in the context of single thread in threadpool and might be perfectly valid for single threaded threadpool scenario)
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoApplicationTests {
int a = 0;
int b = 0;
void contextLoads() throws Exception {
final int count = 10000;
ExecutorService executorService = Executors.newFixedThreadPool(100);
List<Runnable> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
Runnable r = () -> {
add();
bdd();
};
list.add(r);
}
CompletableFuture<?>[] futures = list.stream()
.map(task -> CompletableFuture.runAsync(task, executorService))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
executorService.shutdown();
System.out.println("The final value of a: " + a);
System.out.println("The final value of b:" + b);
}
public void add() {
a++;
}
public void bdd() {
b++;
}
public static void main(String[] args) throws Exception {
new DemoApplicationTests().contextLoads();
}
}
Thank you @Basil Bourque for fixing the grammatical errors
Your pool has 1
thread, and you submit 10
Runnable
s to it. They will all pile-up in a queue
, until it's their turn to execute. Instead of waiting for all of them to finish
, you call shutDown
, effectively saying : "no more tasks will this pool take". When exactly is that going to happen and how many tasks have already been processed before the call to shutDown
happened, is impossible to tell. As such, you get a very non-deterministic result. You could even see 10
as the output (sometimes), but that does not mean this is correct.
Instead, you can wait for the pool to finish executing all of its tasks:
executorService.awaitTermination(2, TimeUnit.SECONDS);
executorService.shutdown();
What slightly "sucks" is that awaitTermination
does not explicitly mentions that if it returns true
, it would establish a happens-before
relationship. So to be pedantic with the JLS
, you would need to work with that Semaphore
for example, to establish the needed guarantees.
You have a race in your code, by updating a shared a
and b
from multiple threads (even if you currently use Executors.newFixedThreadPool(1)
), without any synchronization. So that needs correction also. And a Semaphore semaphore = new Semaphore(3);
is not going to help, since you still will allow 3
concurrent threads to work on those variables; you would need only a single permit
. But then, this acts as Lock
more then a Semaphore
.