What is the safest way to queue multiple threads originating in a loop?
To go around this problem, you can use the concept of Thread Pools, where you define a fixed number of Threads/workers to be used, for example 5 workers, and whenever a Thread finishes executing, an other Future(ly) submmited thread would take its place automatically.
Example :
import concurrent.futures
def myFunction(line, param):
print("Done with :", line, param)
param = "param_example"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
with open("targets", "r") as listfile:
for line in listfile:
print("Starting a thread for: ", line)
futures.append(executor.submit(myFunction, line=line, param=param))
# waiting for the threads to finish and maybe print a result :
for future in concurrent.futures.as_completed(futures):
print(future.result()) # an Exceptino should be handled here!!!
Queues are one way to do it. The way to use them is to put function parameters on a queue, and use threads to get them and do the processing.
The queue size doesn't matter too much in this case because reading the next line is fast. In another case, a more optimized solution would be to set the queue size to at least twice the number of threads. That way if all threads finish processing an item from the queue at the same time, they will all have the next item in the queue ready to be processed.
To avoid complicating the code threads can be set as daemonic so that they don't stop the program from finishing after the processing is done. They will be terminated when the main process finishes.
The alternative is to put a special item on the queue (like None
) for each thread and make the threads exit after getting it from the queue and then join the threads.
For the examples bellow the number of worker threads is set using the workers
variable.
Here is an example of a solution using a queue.
from queue import Queue
from threading import Thread
queue = Queue(workers * 2)
def work():
while True:
myFunction(*queue.get())
queue.task_done()
for _ in range(workers):
Thread(target=work, daemon=True).start()
with open(targets, 'r') as listfile:
for line in listfile:
queue.put((line, param))
queue.join()
A simpler solution might be using ThreadPoolExecutor. It is especially simple in this case because the function being called doesn't return anything that needs to be used in the main thread.
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as executor:
with open(targets, 'r') as listfile:
for line in listfile:
executor.submit(myFunction, line, param)
Also, if it's not a problem to have all lines stored in memory, there is a solution which doesn't use anything other than threads. The work is split in such a way that the threads read some lines from a list and ignore other lines. A simple example with two threads is where one thread reads odd lines and the other reads even lines.
from threading import Thread
with open(targets, 'r') as listfile:
lines = listfile.readlines()
def work_split(n):
for line in lines[n::workers]:
myFunction(line, param)
threads = []
for n in range(workers):
t = Thread(target=work_split, args=(n,))
t.start()
threads.append(t)
for t in threads:
t.join()
I have done a quick benchmark and the Queue
is slightly faster than the ThreadPoolExecutor
, but the solution with the split work is faster than both.