Python : Any way to get one process to have a write lock and others to just read on parallel?

I found these python version of the reader writer lock

A reader-writer lock for Python

Reader-Writer lock with priority for writers (Python recipe)


Just another rwlock I've rolled for own use. Although I haven't used this in heavily sieged production code it worked for my use cases and it has been written by mirroring the code of a C++ class that has been used/tested in a linux server for about a year. It doesn't have an upgrade-from-read-to-write method...

import threading

class RWLock:
    """ Non-reentrant write-preferring rwlock. """
    DEBUG = 0

    def __init__(self):
        self.lock = threading.Lock()

        self.active_writer_lock = threading.Lock()
        # The total number of writers including the active writer and
        # those blocking on active_writer_lock or readers_finished_cond.
        self.writer_count = 0

        # Number of events that are blocking on writers_finished_cond.
        self.waiting_reader_count = 0

        # Number of events currently using the resource.
        self.active_reader_count = 0

        self.readers_finished_cond = threading.Condition(self.lock)
        self.writers_finished_cond = threading.Condition(self.lock)

        class _ReadAccess:
            def __init__(self, rwlock):
                self.rwlock = rwlock
            def __enter__(self):
                self.rwlock.acquire_read()
                return self.rwlock
            def __exit__(self, type, value, tb):
                self.rwlock.release_read()
        # support for the with statement
        self.read_access = _ReadAccess(self)

        class _WriteAccess:
            def __init__(self, rwlock):
                self.rwlock = rwlock
            def __enter__(self):
                self.rwlock.acquire_write()
                return self.rwlock
            def __exit__(self, type, value, tb):
                self.rwlock.release_write()
        # support for the with statement
        self.write_access = _WriteAccess(self)

        if self.DEBUG:
            self.active_readers = set()
            self.active_writer = None

    def acquire_read(self):
        with self.lock:
            if self.DEBUG:
                me = threading.currentThread()
                assert me not in self.active_readers, 'This thread has already acquired read access and this lock isn\'t reader-reentrant!'
                assert me != self.active_writer, 'This thread already has write access, release that before acquiring read access!'
                self.active_readers.add(me)
            if self.writer_count:
                self.waiting_reader_count += 1
                self.writers_finished_cond.wait()
                # Even if the last writer thread notifies us it can happen that a new
                # incoming writer thread acquires the lock earlier than this reader
                # thread so we test for the writer_count after each wait()...
                # We also protect ourselves from spurious wakeups that happen with some POSIX libraries.
                while self.writer_count:
                    self.writers_finished_cond.wait()
                self.waiting_reader_count -= 1
            self.active_reader_count += 1

    def release_read(self):
        with self.lock:
            if self.DEBUG:
                me = threading.currentThread()
                assert me in self.active_readers, 'Trying to release read access when it hasn\'t been acquired by this thread!'
                self.active_readers.remove(me)
            assert self.active_reader_count > 0
            self.active_reader_count -= 1
            if not self.active_reader_count and self.writer_count:
                self.readers_finished_cond.notifyAll()

    def acquire_write(self):
        with self.lock:
            if self.DEBUG:
                me = threading.currentThread()
                assert me not in self.active_readers, 'This thread already has read access - release that before acquiring write access!'
                assert me != self.active_writer, 'This thread already has write access and this lock isn\'t writer-reentrant!'
            self.writer_count += 1
            if self.active_reader_count:
                self.readers_finished_cond.wait()
                while self.active_reader_count:
                    self.readers_finished_cond.wait()

        self.active_writer_lock.acquire()
        if self.DEBUG:
            self.active_writer = me

    def release_write(self):
        if not self.DEBUG:
            self.active_writer_lock.release()
        with self.lock:
            if self.DEBUG:
                me = threading.currentThread()
                assert me == self.active_writer, 'Trying to release write access when it hasn\'t been acquired by this thread!'
                self.active_writer = None
                self.active_writer_lock.release()
            assert self.writer_count > 0
            self.writer_count -= 1
            if not self.writer_count and self.waiting_reader_count:
                self.writers_finished_cond.notifyAll()

    def get_state(self):
        with self.lock:
            return (self.writer_count, self.waiting_reader_count, self.active_reader_count)



if __name__ == '__main__':
    import time, sys
    lock = RWLock()
    start_time = time.time()

    print_lock = threading.Lock()
    def p(msg):
        with print_lock:
            print '%5.2f [%2s] %-15s' % (time.time()-start_time, threading.currentThread().myid, msg)
    def p_state(msg):
        with print_lock:
            print '%5.2f [%2s] %-15s writer_count=%s waiting_reader_count=%s active_reader_count=%s' % \
                    ((time.time()-start_time, threading.currentThread().myid, msg) + lock.get_state())

    def w():
        p('write wait...')
        with lock.write_access:
            p_state('write started.')
            time.sleep(threading.currentThread().mytimeout)
        p_state('write ended.')

    def r():
        p('read wait...')
        with lock.read_access:
            p_state('read started.')
            time.sleep(threading.currentThread().mytimeout)
        p_state('read ended.')

    def start_thread(id, func, timeout):
        thread = threading.Thread(target=func)
        thread.myid = id
        thread.mytimeout = timeout
        thread.start()
        return thread

    TEST_LOCKS = [
        # (id, start_time, duration, r/w)

        # Testing the branches of acquire_read() and release_read()
        (1, 0, 1, r),
        (2, 0.1, 0.5, r),

        (-1, 2, 0, 0),
        (3, 2, 0.5, w),
        (4, 2.1, 0.5, w),
        (5, 2.1, 1, r),
        (6, 2.1, 1, r),
        (7, 2.2, 0.1, w),

        (-1, 5, 0, 0),
        (8, 5, 0.5, r),
        (9, 5.1, 0.5, w),
        (10, 5.1, 0.5, w),

        # Testing the branches of acquire_write() and release_write()
        (-1, 8, 0, 0),
        (11, 8, 1, w),
        (12, 8.1, 0.5, w),

        (-1, 10, 0, 0),
        (13, 10, 0.5, r),
        (14, 10.1, 0.5, w),
        (15, 10.1, 0.5, r),
        (16, 10.2, 0.5, r),
        (17, 10.3, 0.5, w),
    ]

    threading.currentThread().myid = 0

    t = 0
    for id, start, duration, rw in sorted(TEST_LOCKS, key=lambda x:x[1]):
        time.sleep(start - t)
        t = start
        if id < 0:
            p('-----------------------------')
        else:
            start_thread(id, rw, duration)

One more implementation of the ReadWriteLock, takes care of the writer starvation issue and supports promotion of read lock to write lock, if requested for during construction. It utilizes only one Lock and a Condition.

# From O'Reilly Python Cookbook by David Ascher, Alex Martelli
# With changes to cover the starvation situation where a continuous
#   stream of readers may starve a writer, Lock Promotion and Context Managers

class ReadWriteLock:
  """ A lock object that allows many simultaneous "read locks", but
  only one "write lock." """

  def __init__(self, withPromotion=False):
    self._read_ready = threading.Condition(threading.RLock(  ))
    self._readers = 0
    self._writers = 0
    self._promote = withPromotion
    self._readerList = []  # List of Reader thread IDs
    self._writerList = []  # List of Writer thread IDs

  def acquire_read(self):
    logging.debug("RWL : acquire_read()")
    """ Acquire a read lock. Blocks only if a thread has
    acquired the write lock. """
    self._read_ready.acquire(  )
    try:
      while self._writers > 0:
        self._read_ready.wait()
      self._readers += 1
    finally:
      self._readerList.append(threading.get_ident())
      self._read_ready.release(  )

  def release_read(self):
    logging.debug("RWL : release_read()")
    """ Release a read lock. """
    self._read_ready.acquire(  )
    try:
      self._readers -= 1
      if not self._readers:
        self._read_ready.notifyAll(  )
    finally:
      self._readerList.remove(threading.get_ident())
      self._read_ready.release(  )

  def acquire_write(self):
    logging.debug("RWL : acquire_write()")
    """ Acquire a write lock. Blocks until there are no
    acquired read or write locks. """
    self._read_ready.acquire(  )   # A re-entrant lock lets a thread re-acquire the lock
    self._writers += 1
    self._writerList.append(threading.get_ident())
    while self._readers > 0:
      # promote to write lock, only if all the readers are trying to promote to writer
      # If there are other reader threads, then wait till they complete reading
      if self._promote and threading.get_ident() in self._readerList and set(self._readerList).issubset(set(self._writerList)):
        break
      else:
        self._read_ready.wait(  )

  def release_write(self):
    logging.debug("RWL : release_write()")
    """ Release a write lock. """
    self._writers -= 1
    self._writerList.remove(threading.get_ident())
    self._read_ready.notifyAll(  )
    self._read_ready.release(  )

#----------------------------------------------------------------------------------------------------------

class ReadRWLock:
  # Context Manager class for ReadWriteLock
  def __init__(self, rwLock):
    self.rwLock = rwLock

  def __enter__(self):
    self.rwLock.acquire_read()
    return self         # Not mandatory, but returning to be safe

  def __exit__(self, exc_type, exc_value, traceback):
    self.rwLock.release_read()
    return False        # Raise the exception, if exited due to an exception

#----------------------------------------------------------------------------------------------------------

class WriteRWLock:
  # Context Manager class for ReadWriteLock
  def __init__(self, rwLock):
    self.rwLock = rwLock

  def __enter__(self):
    self.rwLock.acquire_write()
    return self         # Not mandatory, but returning to be safe

  def __exit__(self, exc_type, exc_value, traceback):
    self.rwLock.release_write()
    return False        # Raise the exception, if exited due to an exception

#----------------------------------------------------------------------------------------------------------