Easiest way to read csv files with multiprocessing in Pandas
dask
library is designed to address not only but certainly your issue.
I am not getting map/map_async to work, but managed to work with apply_async.
Two possible ways (I have no idea which one is better):
- A) Concat at the end
- B) Concat during
I find glob easy to list and fitler files from a directory
from glob import glob
import pandas as pd
from multiprocessing import Pool
folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')
def my_read(filename):
f = pd.read_csv(filename)
return (f.VALUE.as_matrix()).reshape(75,90)
#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during
def DF_LIST_append(result):
#DF_LIST.append(result) # A) end
global DF # B) during
DF = pd.concat([DF,result], ignore_index=True) # B) during
pool = Pool(processes=8)
for file in file_list:
pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)
pool.close()
pool.join()
#DF = pd.concat(DF_LIST, ignore_index=True) # A) end
print(DF.shape)
Using Pool
:
import os
import pandas as pd
from multiprocessing import Pool
# wrap your csv importer in a function that can be mapped
def read_csv(filename):
'converts a filename to a pandas dataframe'
return pd.read_csv(filename)
def main():
# get a list of file names
files = os.listdir('.')
file_list = [filename for filename in files if filename.split('.')[1]=='csv']
# set up your pool
with Pool(processes=8) as pool: # or whatever your hardware can support
# have your pool map the file names to dataframes
df_list = pool.map(read_csv, file_list)
# reduce the list of dataframes to a single dataframe
combined_df = pd.concat(df_list, ignore_index=True)
if __name__ == '__main__':
main()