Python - What is queue.task_done() used for?
"Read the source, Luke!" -- Obi-one Codobi
The source for ayncio.queue is pretty short.
- the number of unfinished tasks goes up by one when you put to the queue.
- it goes down by one with you call task_done
- join() awaits there being no unfinished tasks.
This makes join useful if and only if you are calling task_done(). Using the classic bank analogy:
- people come in the doors and get in line; door is a producer doing a q.put()
- when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
- When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
- at 5 p.m., the doors are locked door task finishes
- you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
- then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()
Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while they have a person at his or her window.
.task_done()
is used to mark .join()
that the processing is done.
ð¡ If you use
.join()
and don't call.task_done()
for every processed item, your script will hang forever.
Ain't nothin' like a short example;
import logging
import queue
import threading
import time
items_queue = queue.Queue()
running = False
def items_queue_worker():
while running:
try:
item = items_queue.get(timeout=0.01)
if item is None:
continue
try:
process_item(item)
finally:
items_queue.task_done()
except queue.Empty:
pass
except:
logging.exception('error while processing item')
def process_item(item):
print('processing {} started...'.format(item))
time.sleep(0.5)
print('processing {} done'.format(item))
if __name__ == '__main__':
running = True
# Create 10 items_queue_worker threads
worker_threads = 10
for _ in range(worker_threads):
threading.Thread(target=items_queue_worker).start()
# Populate your queue with data
for i in range(100):
items_queue.put(i)
# Wait for all items to finish processing
items_queue.join()
running = False
Queue.task_done
is not there for the workers' benefit. It is there to support Queue.join
.
If I give you a box of work assignments, do I care about when you've taken everything out of the box?
No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.
Queue.task_done
lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join
will wait until enough task_done
calls have been made, not when the queue is empty.
eigenfield points out in the comments that it seems really weird for a queue to have task_done
/join
methods. That's true, but it's really a naming problem. The queue
module has bad name choices that make it sound like a general-purpose queue library, when it's really a thread communication library.
It'd be weird for a general-purpose queue to have task_done
/join
methods, but it's entirely reasonable for an inter-thread message channel to have a way to indicate that messages have been processed. If the class was called thread_communication.MessageChannel
instead of queue.Queue
and task_done
was called message_processed
, the intent would be a lot clearer.
(If you need a general-purpose queue rather than an inter-thread message channel, use collections.deque
.)