Python/Django polling of database has memory leak
Does the settings.py file for the daemon process have DEBUG = True
? If so, Django keeps in memory a record of all the SQL it has run so far, which can lead to a memory leak.
You need to regularly reset a list of queries that Django keeps for debugging purposes. Normally it is cleared after every request, but since your application is not request based, you need to do this manually:
from django import db
db.reset_queries()
See also:
"Debugging Django memory leak with TrackRefs and Guppy" by Mikko Ohtamaa:
Django keeps track of all queries for debugging purposes (connection.queries). This list is reseted at the end of HTTP request. But in standalone mode, there are no requests. So you need to manually reset to queries list after each working cycle
"Why is Django leaking memory?" in Django FAQ - it talks both about setting
DEBUG
toFalse
, which is always important, and about clearing the list of queries usingdb.reset_queries()
, important in applications like yours.
I had a lot of data crunching to do, so, my solution to this issue was using multiprocessing, and using pools to counteract whatever memory bloat was happening.
To keep it simple, I just defined some "global" (top-level, whatever the term is in Python) functions instead of trying to make things pickle-able.
Here it is in abstract form:
import multiprocessing as mp
WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound
# this is a global function
def worker(params):
# do stuff
return something_for_the_callback_to_analyze
# this is a global function
def worker_callback(worker_return_value):
# report stuff, or pass
# My multiprocess_launch was inside of a class
def multiprocess_launcher(params):
# somehow define a collection
while True:
if len(collection) == 0:
break
# Take a slice
pool_sub_batch = []
for _ in range(WORKERS):
if collection: # as long as there's still something in the collection
pool_sub_batch.append( collection.pop() )
# Start a pool, limited to the slice
pool_size = WORKERS
if len(pool_sub_batch) < WORKERS:
pool_size = len(pool_sub_batch)
pool = mp.Pool(processes=pool_size)
for sub_batch in pool_sub_batch:
pool.apply_async(worker, args = (sub_batch), callback = worker_callback)
pool.close()
pool.join()
# Loop, more slices