Python - appending to same file from multiple threads

the fact that you never see jumbled text on the same line or new lines in the middle of a line is a clue that you actually dont need to syncronize appending to the file. the problem is that you use print to write to a single file handle. i suspect print is actually doing 2 operations to the file handle in one call and those operations are racing between the threads. basically print is doing something like:

file_handle.write('whatever_text_you_pass_it')
file_handle.write(os.linesep)

and because different threads are doing this simultaneously on the same file handle sometimes one thread will get in the first write and the other thread will then get in its first write and then you'll get two carriage returns in a row. or really any permutation of these.

the simplest way to get around this is to stop using print and just use write directly. try something like this:

output.write(f + os.linesep)

this still seems dangerous to me. im not sure what gaurantees you can expect with all the threads using the same file handle object and contending for its internal buffer. personally id side step the whole issue and just have every thread get its own file handle. also note that this works because the default for write buffer flushes is line-buffered, so when it does a flush to the file it ends on an os.linesep. to force it to use line-buffered send a 1 as the third argument of open. you can test it out like this:

#!/usr/bin/env python
import os
import sys
import threading

def hello(file_name, message, count):
  with open(file_name, 'a', 1) as f:
    for i in range(0, count):
      f.write(message + os.linesep)

if __name__ == '__main__':
  #start a file
  with open('some.txt', 'w') as f:
    f.write('this is the beginning' + os.linesep)
  #make 10 threads write a million lines to the same file at the same time
  threads = []
  for i in range(0, 10):
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000)))
    threads[-1].start()
  for t in threads:
    t.join()
  #check what the heck the file had
  uniq_lines = set()
  with open('some.txt', 'r') as f:
    for l in f:
      uniq_lines.add(l)
  for u in uniq_lines:
    sys.stdout.write(u)

The output looks like this:

hey im thread 6
hey im thread 7
hey im thread 9
hey im thread 8
hey im thread 3
this is the beginning
hey im thread 5
hey im thread 4
hey im thread 1
hey im thread 0
hey im thread 2

And maybe some more newlines where they shouldn't be?

You should have in mind the fact that a shared resource should not be accessed by more than one thread at a time or otherwise unpredictable consequences might happen (it's called using 'atomic operations' while using threads).

Take a look at this page for a little intuition: Thread Synchronization Mechanisms in Python


The solution is to write to the file in one thread only.

import Queue  # or queue in Python 3
import threading

class PrintThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print(f, file=output)

    def run(self):
        while True:
            result = self.queue.get()
            self.printfiles(result)
            self.queue.task_done()

class ProcessThread(threading.Thread):
    def __init__(self, in_queue, out_queue):
        threading.Thread.__init__(self)
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            path = self.in_queue.get()
            result = self.process(path)
            self.out_queue.put(result)
            self.in_queue.task_done()

    def process(self, path):
        # Do the processing job here

pathqueue = Queue.Queue()
resultqueue = Queue.Queue()
paths = getThisFromSomeWhere()

output = codecs.open('file', 'a')

# spawn threads to process
for i in range(0, 5):
    t = ProcessThread(pathqueue, resultqueue)
    t.setDaemon(True)
    t.start()

# spawn threads to print
t = PrintThread(resultqueue)
t.setDaemon(True)
t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()
resultqueue.join()