Producer/consumer multithreading
You seem to have done a pretty good job here. Not much to nitpick actually. One think I would like to recommend is you should avoid synchronizing on the buffer object itself. In this case it's ok, but assuming you switch to a data structure buffer instead, depending on the class it might be synchronized internally (e.g. Vector
, although it's obsolete by now), so acquiring a lock from the outside might mess it up.
Edit: Bhaskar makes a good point about using a while
to wrap calls to wait
. This is because of the infamous spurious wake-ups that can occur, forcing the thread to come out of wait
prematurely, so you need to make sure it goes back in.
What you could do next is to implement a finite-buffer producer consumer: have some shared data structure e.g. a linked list and set a maximum size (e.g. 10 items). Then let the producer keep producing and only suspend it whenever there are 10 items in the queue. The consumer will be suspended whenever the buffer is empty.
The next steps you could take are learning how to automate the process you have implemented manually. Take a look at BlockingQueue
that provides a buffer with blocking behavior (i.e. the consumer will automatically block if the buffer is empty and the producer will block if it's full).
Also, depending on the situation, executors (look at ExecutorService
) can be a worthy replacement, since they encapsulate a task queue and one or more workers (consumers) so all you need is the producer.
Is the above correct?
The only problem I see is what has been mentioned by @Tudor and @Bhaskar. Whenever you are testing for a condition when you are waiting for it, you must use a while
loop. However, this is more about race-conditions with multiple producers and consumers. Spurious wakeups can happen but the race-conditions are much more likely. See my page on the subject.
Yes, you only have 1 producer and 1 consumer but you may try to extend your code for multiple consumers or copy your code to another scenario.
I have learned not to claim correctness from "a successful run". On the contrary, I have become very suspicious of pseudo-parallel code!
Good instinct.
How could I test parallel code as I code my next attempts?
This is very hard. Scaling it up is one way. Add multiple producers and consumers and see if there are problems. Running on multiple architectures with different numbers/types of processors. Your best defense will be code correctness. Tight synchronization, good use of BlockingQueue
, ExecutorService
, etc. classes to make your close simpler/cleaner.
No easy answer. Testing multithreaded code is extremely hard.
Which tools can help me in both development and debugging?
In terms of general stuff, I'd look into a coverage tool like Emma so you can make sure your unit tests are covering all of your code.
In terms of multithreading code testing, get to know how to read kill -QUIT
thread-dumps and look at running threads inside of Jconsole. Java profilers like YourKit may also help.
Would the approach change if I allowed the Producer to continue producing, with each production taking some variable amount of time...
I don't think so. The consumer will wait on the producer forever. Maybe I'm not understanding this question?
Is this method of doing things obsolete and should I rather be learning something else? From this tollbooth, I have no idea of what happens "in the real world of Java"
Learning about the ExecutorService
classes is next. These handle a large percentage of the new Thread()
style code -- especially when you are dealing with a number of asynchronous tasks being executed with threads. Here's a tutorial.
Where should I go from here?
Again, ExecutorService
. I assume you've read this starting docs. As @Bhaskar mentioned, Java Concurrency in Practice is a good bible.
Here are some general comments about your code:
The
SharedSpace
andThreaded
classes seems like a contrived way to do this. If you are playing around with base classes and the like then fine. But in general, I never use a pattern like this. A producer and consumer are usually working with aBlockingQueue
likeLinkedBlockingQueue
in which case the synchronization andvolatile
payloads are taken care of for you. Also, I tend to inject shared information into an object constructor as opposed to getting it from a base class.Typically if I am using
synchronized
it is on aprivate final
field. Often I create aprivate final Object lockObject = new Object();
for locking unless I am working with an object already.Be careful of huge
synchronized
blocks and putting log messages inside ofsynchronized
sections. Logs usually dosynchronized
IO to the file-system which can be very expensive. You should have small, very tight,synchronized
blocks if possible.You define
consumedData
outside of the loop. I would define it at the point of the assignment and then use abreak
to bail from the loop if it is== -1
. Make sure to limit your local variables scope if at all possible.Your logging messages are going to dominate your code performance. This means that when you remove them, your code is going to perform completely differently. This is very important to realize when you go to debug problems with it. The performance will also (most likely) change when you move to a different architecture with different CPUs/cores.
You probably know this but when you call
sharedSpace.notify();
, that only means that another thread is notified if it is currently insharedSpace.wait();
. If it is doesn't something else then it will miss the notification. Just FYI.It's a little strange to do a
if (nIterations <= N_ITERATIONS)
, and then 3 lines below theelse
do it again. Duplicating thenotify()
would be better to simplify the branching.You have
int nIterations = 0;
then awhile
then inside a ++. That's a recipe for a for loop:for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
Here's a much tighter version of your code. This is just an example of how I would write it. Again, aside from the missing while
there seems to be nothing wrong with your version.
public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
int consumedData = queue.take();
if (consumedData == Producer.FINAL_VALUE) {
logger.info("Consumed: END (end of data production token).");
break;
}
logger.info("Consumed: {}.", consumedData);
}
logger.info("Signing off.");
}
}
public class Producer implements Runnable {
public static final int FINAL_VALUE = -1;
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
logger.info("Produced: {}", nIterations);
queue.put(nIterations);
}
queue.put(FINAL_VALUE);
logger.info("Produced: END (end of data production token).");
logger.info("Signing off.");
}
}
public class ProducerConsumer {
public static void main(String[] args) {
// you can add an int argument to the LinkedBlockingQueue constructor
// to only allow a certain number of items in the queue at one time
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
Thread producer = new Thread(new Producer(queue), "Producer");
Thread consumer = new Thread(new Consumer(queue), "Consumer");
// start and join go here
}
}