"select" on multiple Python multiprocessing Queues?

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

  • http://bugs.python.org/issue3831

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

  • http://haltcondition.net/?p=2319

and then use select.


Seems like using threads which forward incoming items to a single Queue which you then wait on is a practical choice when using multiprocessing in a platform independent manner.

Avoiding the threads requires either handling low-level pipes/FDs which is both platform specific and not easy to handle consistently with the higher-level API.

Or you would need Queues with the ability to set callbacks which i think are the proper higher level interface to go for. I.e. you would write something like:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Maybe the multiprocessing package could grow this API but it's not there yet. The concept works well with py.execnet which uses the term "channel" instead of "queues", see here http://tinyurl.com/nmtr4w


Actually you can use multiprocessing.Queue objects in select.select. i.e.

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).


Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

My code for doing this is:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The following test routine shows how to use it:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Hope this helps.

Tony Wallace