The right way to limit maximum number of threads running at once?
semaphore is a variable or abstract data type that is used to control access to a common resource by multiple processes in a concurrent system such as a multiprogramming operating system; this can help you here.
threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)
class MyThread(threading.Thread):
def run(self):
threadLimiter.acquire()
try:
self.Executemycode()
finally:
threadLimiter.release()
def Executemycode(self):
print(" Hello World!")
# <your code here>
This way you can easily limit the number of threads that will be executed concurrently during the program execution. Variable, 'maximumNumberOfThreads' can be used to define an upper limit on the maximum value of threads.
credits
It sounds like you want to implement the producer/consumer pattern with eight workers. Python has a Queue
class for this purpose, and it is thread-safe.
Each worker should call get()
on the queue to retrieve a task. This call will block if no tasks are available, causing the worker to go idle until one becomes available. Then the worker should execute the task and finally call task_done()
on the queue.
You would put tasks in the queue by calling put()
on the queue.
From the main thread, you can call join()
on the queue to wait until all pending tasks have been completed.
This approach has the benefit that you are not creating and destroying threads, which is expensive. The worker threads will run continuously, but will be asleep when no tasks are in the queue, using zero CPU time.
(The linked documentation page has an example of this very pattern.)
It would be much easier to implement this as a thread pool or executor, using either multiprocessing.dummy.Pool
, or concurrent.futures.ThreadPoolExecutor
(or, if using Python 2.x, the backport futures
). For example:
import concurrent
def f(arg):
print("Started a task. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
print("Done")
with concurrent.futures.ThreadPoolExecutor(8) as executor:
while True:
arg = get_task()
executor.submit(f, arg)
Of course if you can change the pull-model get_task
to a push-model get_tasks
that, e.g., yields tasks one at a time, this is even simpler:
with concurrent.futures.ThreadPoolExecutor(8) as executor:
for arg in get_tasks():
executor.submit(f, arg)
When you run out of tasks (e.g., get_task
raises an exception, or get_tasks
runs dry), this will automatically tell the executor to stop after it drains the queue, wait for it to stop, and clean up everything.
I ran into this same problem and spent days (2 days to be precise) getting to the correct solution using a queue. I wasted a day going down the ThreadPoolExecutor path because there is no way to limit the number of threads that thing launches! I fed it a list of 5000 files to copy and the code went non-responsive once it got up to about 1500 concurrent file copies running all at once. The max_workers parameter on the ThreadPoolExecutor only controls how many workers are spinning up threads not how many threads get spun up.
Ok, anyway, here is a very simple example of using a Queue for this:
import threading, time, random
from queue import Queue
jobs = Queue()
def do_stuff(q):
while not q.empty():
value = q.get()
time.sleep(random.randint(1, 10))
print(value)
q.task_done()
for i in range(10):
jobs.put(i)
for i in range(3):
worker = threading.Thread(target=do_stuff, args=(jobs,))
worker.start()
print("waiting for queue to complete", jobs.qsize(), "tasks")
jobs.join()
print("all done")