PyMongo cursor batch_size
Pymongo has some quality-of-life helpers for the Cursor
class, so it will automatically do the batching for you, and return result to you in terms of documents.
The batch_size
setting is set, but the idea is you only need to set it in the find()
method, and not have to do manual low level calls or iterating through the batches.
For example, if I have 100 documents in my collection:
> db.test.count()
100
I then set the profiling level to log all queries:
> db.setProfilingLevel(0,-1)
{
"was": 0,
"slowms": 100,
"sampleRate": 1,
"ok": 1,
...
I then use pymongo to specify batch_size
of 10:
import pymongo
import bson
conn = pymongo.MongoClient()
cur = conn.test.test.find({}, {'txt':0}, batch_size=10)
print(list(cur))
Running that query, I see in the MongoDB log:
2019-02-22T15:03:54.522+1100 I COMMAND [conn702] command test.test command: find { find: "test", filter: {} ....
2019-02-22T15:03:54.523+1100 I COMMAND [conn702] command test.test command: getMore { getMore: 266777378048, collection: "test", batchSize: 10, ....
(getMore repeated 9 more times)
So the query was fetched from the server in the specified batches. It's just hidden from you via the Cursor
class.
Edit
If you really need to get the documents in batches, there is a function find_raw_batches()
under Collection (doc link). This method works similarly to find()
and accepts the same parameters. However be advised that it will return raw BSON which will need to be decoded by the application in a separate step. Notably, this method does not support sessions.
Having said that, if the aim is to lower the application's memory usage, it's worth considering modifying the query so that it uses ranges instead. For example:
find({'$gte': <some criteria>, '$lte': <some other criteria>})
Range queries are easier to optimize, can use indexes, and (in my opinion) easier to debug and easier to restart should the query gets interrupted. This is less flexible when using batches, where you have to restart the query from scratch and go over all the batches again if it gets interrupted.
This is how I do it, it helps getting the data chunked up but I thought there would be a more straight forward way to do this. I created a yield_rows function that gets you the generates and yields chunks, it ensures the used chunks are deleted.
import pymongo as pm
CHUNK_SIZE = 500
client = pm.MongoClient()
coll = client.get_database('db').get_collection('coll')
cursor = coll.find({}, batch_size=CHUNK_SIZE)
def yield_rows(cursor, chunk_size):
"""
Generator to yield chunks from cursor
:param cursor:
:param chunk_size:
:return:
"""
chunk = []
for i, row in enumerate(cursor):
if i % chunk_size == 0 and i > 0:
yield chunk
del chunk[:]
chunk.append(row)
yield chunk
chunks = yield_rows(cursor, CHUNK_SIZE)
for chunk in chunks:
# do processing here
pass
If I find a cleaner, more efficient way to do this I'll update my answer.