Can dask parralelize reading fom a csv file?
Piggybacking off of @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes')
or df.compute(scheduler='threads')
to convert to pandas using multiprocessing or multithreading:
from dask import dataframe as ddf
df = ddf.read_csv("data/Measurements*.csv",
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(scheduler='processes') # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
Yes, dask.dataframe can read in parallel. However you're running into two problems:
Pandas.read_csv only partially releases the GIL
By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text
dask.dataframe.to_hdf(filename) forces sequential computation
Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)
Edit: New solution
Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.
from dask.distributed import Client
client = Client()
df = dask.dataframe.read_csv(...)
df.to_parquet(...)
Solution
Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.
import dask.dataframe as ddf
import dask.multiprocessing
df = ddf.read_csv("data/Measurements*.csv", # read in parallel
sep=';',
parse_dates=["DATETIME"],
blocksize=1000000,
)
df = df.compute(get=dask.multiprocessing.get) # convert to pandas
df['Type'] = df['Type'].astype('category')
df['Condition'] = df['Condition'].astype('category')
df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')