Producer/Consumer threads using a Queue

Java 5+ has all the tools you need for this kind of thing. You will want to:

  1. Put all your Producers in one ExecutorService;
  2. Put all your Consumers in another ExecutorService;
  3. If necessary, communicate between the two using a BlockingQueue.

I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So the producers submit directly to consumers.


OK, as others note, the best thing to do is to use java.util.concurrent package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.

As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.

Leaving that aside, the second implementation seem better. You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.

If you want to go ahead with your own implementation (for learning purpose I guess?), supply a start() method at least. You should construct the object (you can instantiate the Thread object), and then call start() to start the thread.

Edit: ExecutorService have their own queue so this can be confusing.. Here's something to get you started.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Further EDIT: For producer, instead of while(true), you can do something like:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

This way you can shutdown the executor by calling .shutdownNow(). If you'd use while(true), it won't shutdown.

Also note that the Producer is still vulnerable to RuntimeExceptions (i.e. one RuntimeException will halt the processing)


You are reinventing the wheel.

If you need persistence and other enterprise features use JMS (I'd suggest ActiveMq).

If you need fast in-memory queues use one of the impementations of java's Queue.

If you need to support java 1.4 or earlier, use Doug Lea's excellent concurrent package.


I have extended cletus proposed answer to working code example.

  1. One ExecutorService (pes) accepts Producer tasks.
  2. One ExecutorService (ces) accepts Consumer tasks.
  3. Both Producer and Consumer shares BlockingQueue.
  4. Multiple Producer tasks generates different numbers.
  5. Any of Consumer tasks can consume number generated by Producer

Code:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

output:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

Note. If you don't need multiple Producers and Consumers, keep single Producer and Consumer. I have added multiple Producers and Consumers to showcase capabilities of BlockingQueue among multiple Producers and Consumers.