Multiple blocking queues, single consumer
One trick that you could do is to have a queue of queues. So what you'd do is have a single blocking queue which all threads subscribe to. Then when you enqueue something into one of your BlockingQueues, you also enqueue your blocking queue on this single queue. So you would have something like:
BlockingQueue<WorkItem> producers[] = new BlockingQueue<WorkItem>[NUM_PRODUCERS];
BlockingQueue<BlockingQueue<WorkItem>> producerProducer = new BlockingQueue<BlockingQueue<WorkItem>>();
Then when you get a new work item:
void addWorkItem(int queueIndex, WorkItem workItem) {
assert queueIndex >= 0 && queueIndex < NUM_PRODUCERS : "Pick a valid number";
//Note: You may want to make the two operations a single atomic operation
producers[queueIndex].add(workItem);
producerProducer.add(producers[queueIndex]);
}
Now your consumers can all block on the producerProducer. I am not sure how valuable this strategy would be, but it does accomplish what you want.
Polymorphism
This can be solved using polymorphism, imagine you have two jobs called JobOne
and JobTwo
and you are going to consume these by a consumer. You should define an interface called jobs like:
interface Jobs {
...
}
And then you can implement you jobs by this interface like:
class JobOne implements Job {
...
}
class JobTwo implements Job {
...
}
Now you can use Jobs
for your blocking queue to add two kinds of jobs to consume.
BlockingQueue<Jobs> Jobs queue = new BlockingQueue<Jobs>();
The LinkedBlockingMultiQueue does what you are asking for. It does not allow the consumer to block on arbitrary BlockingQueues, but it is possible to create "sub queues" from a single "multi queue" and achieve the same effect. Producers offer in the sub queues, and consumers can block themselves polling the single multi queue, waiting for any element.
It also supports priorities, that is, taking elements from some queues before considering others.
Example:
LinkedBlockingMultiQueue<Int, String> q = new LinkedBlockingMultiQueue<>();
q.addSubQueue(1 /* key */, 10 /* priority */);
q.addSubQueue(2 /* key */, 10 /* priority */);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq1 = q.getSubQueue(1);
LinkedBlockingMultiQueue<Int, String>.SubQueue sq2 = q.getSubQueue(2);
Then you can offer and poll:
sq1.offer("x1");
q.poll(); // "x1"
sq2.offer("x2");
q.poll(); // "x2"
Disclaimer: I am the author of the library.