Increment counter for every access to a Flask view
There is a small gotcha in the accepted answer from @davidism. The multiprocessing.Value
is accessed outside of the lock, so there is still a chance for duplicate values if you are unlucky.
Here is an example showing that collision. It also shows how this collision is possible if you are using asyncronous code (asyncio has it's own locking mechanisms).
import asyncio
import concurrent.futures
import time
from multiprocessing import Value
# Have sleep timings that could cause value collisions outside of lock context manager
TIMINGS = [(0, 0), (1, 1), (0, 2)]
counter = Value('i', 0)
def incr_counter(pre_incr_sleep, pre_return_sleep):
time.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
time.sleep(pre_return_sleep)
return counter.value
def incr_counter_context(pre_incr_sleep, pre_return_sleep):
time.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
time.sleep(pre_return_sleep)
return counter.value
async def aincr_counter(pre_incr_sleep, pre_return_sleep):
"""Return outside of the locked context (This should multi increment in some scenarios)"""
await asyncio.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
await asyncio.sleep(pre_return_sleep)
return counter.value
async def aincr_counter_context(pre_incr_sleep, pre_return_sleep):
"""Return outside of the locked context (This shouldn't multi increment in any scenario)"""
await asyncio.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
await asyncio.sleep(pre_return_sleep)
return counter.value
print("*** Showing that multiprocessing.Value is multiprocess safe ***")
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
print("Testing concurrent returning inside of lock...")
for timings in TIMINGS:
futures.append(executor.submit(incr_counter_context, *timings))
print("Returning value inside of lock context won't cause duplicates when using non-asyncronous executor")
print([future.result() for future in futures])
futures = []
print("Testing concurrent returning outside lock...")
for timings in TIMINGS:
futures.append(executor.submit(incr_counter, *timings))
print("Returning value outside of lock context can cause duplicate values")
print([future.result() for future in futures])
loop = asyncio.get_event_loop()
print("*** Showing that multiprocessing.Value is not async safe ***")
print("Testing async returning outside of lock...")
print(loop.run_until_complete(asyncio.gather(*[aincr_counter(pre, post) for pre, post in TIMINGS])))
print("Testing async returning inside of lock...")
print(loop.run_until_complete(asyncio.gather(*[aincr_counter_context(pre, post) for pre, post in TIMINGS])))
Here is the output of the above:
*** Showing that multiprocessing.Value is multiprocess safe ***
Testing concurrent returning inside of lock...
Returning value inside of lock context won't cause duplicates when using non-asyncronous executor
[1, 3, 2]
Testing concurrent returning outside lock...
Returning value outside of lock context can cause duplicate values
[4, 6, 6]
*** Showing that multiprocessing.Value is not async safe ***
Testing async returning outside of lock...
[8, 9, 9]
Testing async returning inside of lock...
[11, 12, 12]
Luckily, you are using Flask which is synchronous, so the async problem isn't a concern for your use case.
So, I would suggest changing the accepted answer to store the lock inside the context and then release the lock ASAP. If you were to call jsonify or anything else you would keep the lock while doing operations which don't require it.
@app.route('/')
def index():
with counter.get_lock():
counter.value += 1
# save the value ASAP rather than passing to jsonify
# to keep lock time short
unique_count = counter.value
return jsonify(count=unique_count)
Counting concurrently is hard. Assume the count is 0. If two users both hit the endpoint at close enough intervals, they may each get the value 0, increment it to 1, and put it back. Two users hit the endpoint, but the resulting count is 1, not 2. To get around this, you need to use a data store that supports incrementing atomically (as in, an operation that only one process can do at a time).
You can't use a simple Python global
because WSGI servers will spawn multiple processes, so they will each have their own independent copy of the global. Repeated requests could be handled by different processes, resulting in different, unsynchronized values.
The simplest solution is a Python multiprocessing.Value
. This synchronizes access to a shared value across processes, as long as the processes are spawned after the value is created.
from flask import Flask, jsonify
from multiprocessing import Value
counter = Value('i', 0)
app = Flask(__name__)
@app.route('/')
def index():
with counter.get_lock():
counter.value += 1
out = counter.value
return jsonify(count=out)
app.run(processes=8)
# access http://localhost:5000/ multiple times quickly, the count will be correct
There are still some caveats:
- The data only persists as long as the manager is alive. If you restart the server, the counter resets too.
- If the application processes are distributed across multiple machines, shared memory suffers the same issues as globals: they are only synchronized on the local machine, not across the network.
For real world scenarios, Redis is a much more robust solution. The server is independent of the web application, has options for persistence, and can do atomic increments. It can also be used for other parts of the application, such as caching.