Get a unique ID for worker in python multiprocessing pool
I did this with threading and ended up using a queue to handle job management. Here is the baseline. My complete version has a bunch of try-catches
(particularly in the worker, to make sure that q.task_done()
is called even on failure).
from threading import Thread
from queue import Queue
import time
import random
def run(idx, *args):
time.sleep(random.random() * 1)
print idx, ':', args
def run_jobs(jobs, workers=1):
q = Queue()
def worker(idx):
while True:
args = q.get()
run(idx, *args)
q.task_done()
for job in jobs:
q.put(job)
for i in range(0, workers):
t = Thread(target=worker, args=[i])
t.daemon = True
t.start()
q.join()
if __name__ == "__main__":
run_jobs([('job', i) for i in range(0,10)], workers=5)
I didn't need to use multiprocessing (my workers are just for calling an external process), but this could be extended. The API for multiprocessing changes it a touch, here's how you could adapt:
from multiprocessing import Process, Queue
from Queue import Empty
import time
import random
def run(idx, *args):
time.sleep(random.random() * i)
print idx, ':', args
def run_jobs(jobs, workers=1):
q = Queue()
def worker(idx):
try:
while True:
args = q.get(timeout=1)
run(idx, *args)
except Empty:
return
for job in jobs:
q.put(job)
processes = []
for i in range(0, workers):
p = Process(target=worker, args=[i])
p.daemon = True
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
run_jobs([('job', i) for i in range(0,10)], workers=5)
Both versions will output something like:
0 : ('job', 0)
1 : ('job', 2)
1 : ('job', 6)
3 : ('job', 3)
0 : ('job', 5)
1 : ('job', 7)
2 : ('job', 1)
4 : ('job', 4)
3 : ('job', 8)
0 : ('job', 9)
You can use multiprocessing.Queue
to store the ids and then get the id at initialization of the pool process.
Advantages:
- You do not need to rely on internals.
- If your use case is to manage resources/ devices then you can put in the device number directly. This will also ensure that no device is used twice: If you have more processes in your pool than devices, the additional processes will block on
queue.get()
and will not perform any work (This won't block your porgram, or at least it did not when I tested).
Disadvantages:
- You have additional communication overhead and spawning the pool
processes takes a tiny bit longer: Without the
sleep(1)
in the example all work might be performed by the first process, as others are not done initializing, yet. - You need a global (or at least I don't know a way around it)
Example:
import multiprocessing
from time import sleep
def init(queue):
global idx
idx = queue.get()
def f(x):
global idx
process = multiprocessing.current_process()
sleep(1)
return (idx, process.pid, x * x)
ids = [0, 1, 2, 3]
manager = multiprocessing.Manager()
idQueue = manager.Queue()
for i in ids:
idQueue.put(i)
p = multiprocessing.Pool(8, init, (idQueue,))
print(p.map(f, range(8)))
Output:
[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)]
Note, that there are only 4 different pid, although the pool contains 8 processes and one idx is only used by one process.
It seems like what you want is simple: multiprocessing.current_process()
. For example:
import multiprocessing
def f(x):
print multiprocessing.current_process()
return x * x
p = multiprocessing.Pool()
print p.map(f, range(6))
Output:
$ python foo.py
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-3, started daemon)>
<Process(PoolWorker-1, started daemon)>
<Process(PoolWorker-2, started daemon)>
<Process(PoolWorker-4, started daemon)>
[0, 1, 4, 9, 16, 25]
This returns the process object itself, so the process can be its own identity. You could also call id
on it for a unique numerical id -- in cpython, this is the memory address of the process object, so I don't think there's any possibility of overlap. Finally, you can use the ident
or the pid
property of the process -- but that's only set once the process is started.
Furthermore, looking over the source, it seems to me very likely that autogenerated names (as exemplified by the first value in the Process
repr strings above) are unique. multiprocessing
maintains an itertools.counter
object for every process, which is used to generate an _identity
tuple for any child processes it spawns. So the top-level process produces child process with single-value ids, and they spawn process with two-value ids, and so on. Then, if no name is passed to the Process
constructor, it simply autogenerates the name based on the _identity, using ':'.join(...)
. Then Pool
alters the name of the process using replace
, leaving the autogenerated id the same.
The upshot of all this is that although two Process
es may have the same name, because you may assign the same name to them when you create them, they are unique if you don't touch the name parameter. Also, you could theoretically use _identity
as a unique identifier; but I gather they made that variable private for a reason!
An example of the above in action:
import multiprocessing
def f(x):
created = multiprocessing.Process()
current = multiprocessing.current_process()
print 'running:', current.name, current._identity
print 'created:', created.name, created._identity
return x * x
p = multiprocessing.Pool()
print p.map(f, range(6))
Output:
$ python foo.py
running: PoolWorker-1 (1,)
created: Process-1:1 (1, 1)
running: PoolWorker-2 (2,)
created: Process-2:1 (2, 1)
running: PoolWorker-3 (3,)
created: Process-3:1 (3, 1)
running: PoolWorker-1 (1,)
created: Process-1:2 (1, 2)
running: PoolWorker-2 (2,)
created: Process-2:2 (2, 2)
running: PoolWorker-4 (4,)
created: Process-4:1 (4, 1)
[0, 1, 4, 9, 16, 25]