A better way to load MongoDB data to a DataFrame using Pandas and PyMongo?
The from_records
classmethod
is probably the best way to do it:
from pandas import pd
import pymongo
client = pymongo.MongoClient()
data = db.mydb.mycollection.find() # or db.mydb.mycollection.aggregate(pipeline)
df = pd.DataFrame.from_records(data)
an elegant way of doing it would be as follows:
import pandas as pd
def my_transform_logic(x):
if x :
do_something
return result
def process(cursor):
df = pd.DataFrame(list(cursor))
df['result_col'] = df['col_to_be_processed'].apply(lambda value: my_transform_logic(value))
#making list off dictionaries
db.collection_name.insert_many(final_df.to_dict('records'))
# or update
db.collection_name.update_many(final_df.to_dict('records'),upsert=True)
#make a list of cursors.. you can read the parallel_scan api of pymongo
cursors = mongo_collection.parallel_scan(6)
for cursor in cursors:
process(cursor)
I tried the above process on a mongoDB collection with 2.6 million records using Joblib on the above code. My code didnt throw any memory errors and the processing finished in 2 hrs.
I've modified my code to the following:
cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)
By adding the fields parameter in the find() function I restricted the output. Which means that I'm not loading every field but only the selected fields into the DataFrame. Everything works fine now.
The fastest, and likely most memory-efficient way, to create a DataFrame from a mongodb query, as in your case, would be using monary.
This post has a nice and concise explanation.