How to separate files using dask groupby on a column
I'm not exactly sure what you need to achieve but I don't think you need any group by for your problem. It seems to me a simple filtering issue.
You can just loop over all your files and create new instrument files and append on those.
Also I don't have example files to experiment but I think you can also just use pandas with chunksize to read large csv files.
Example:
import pandas as pd
import glob
import os
# maybe play around to get better performance
chunksize = 1000000
files = glob.glob('./file_*.csv')
for f in files:
for chunk in pd.read_csv(f, chunksize=chunksize):
u_inst = chunk['instrument'].unique()
for inst in u_inst:
# filter instrument data
inst_df = chunk[chunk.instrument == inst]
# filter columns
inst_df = inst_df[['time', 'code', 'val']]
# append to instrument file
# only write header if not exist yet
inst_file = f'./instrument_{inst}.csv'
file_exist = os.path.isfile(inst_file)
inst_df.to_csv(inst_file, mode='a', header=not file_exist)
In case every single file fits in memory you can try this:
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
Generate dummy files
fldr_in = "test_in"
fldr_out = "test_out"
N = int(1e6)
for i in range(10):
fn = f"{fldr_in}/file{i}.csv"
os.makedirs(os.path.dirname(fn), exist_ok=True)
df = pd.DataFrame({"instrument":np.random.randint(10,100,N),
"value":np.random.rand(N)})
df.to_csv(fn, index=False)
Define function
The following function save to parquet for every single instrument in the path fldr_out/instrument=i/fileN.csv
def fun(x, fn, fldr_out):
inst = x.instrument.unique()[0]
filename = os.path.basename(fn)
fn_out = f"{fldr_out}/instrument={inst}/{filename}"
fn_out = fn_out.replace(".csv", ".parquet")
os.makedirs(os.path.dirname(fn_out), exist_ok=True)
x.drop("instrument", axis=1)\
.to_parquet(fn_out, index=False)
and you can use it with a group by
for f in files:
fn = f"{fldr_in}/{f}"
df = pd.read_csv(fn)
df.groupby("instrument").apply(lambda x: fun(x, fn, fldr_out))
Performing Analysis with dask
Now you can use dask
to read the results and perform your analysis
df = dd.read_parquet(fldr_out)