Parallelize apply after pandas groupby
I have a hack I use for getting parallelization in Pandas. I break my dataframe into chunks, put each chunk into the element of a list, and then use ipython's parallel bits to do a parallel apply on the list of dataframes. Then I put the list back together using pandas concat
function.
This is not generally applicable, however. It works for me because the function I want to apply to each chunk of the dataframe takes about a minute. And the pulling apart and putting together of my data does not take all that long. So this is clearly a kludge. With that said, here's an example. I'm using Ipython notebook so you'll see %%time
magic in my code:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
For this example I'm going to make 'chunks' based on the above groupby, but this does not have to be how the data is chunked. Although it's a pretty common pattern.
dflist = []
for name, group in grouped:
dflist.append(group)
set up the parallel bits
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
write a silly function to apply to our data
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
now let's run the code in serial then in parallel. serial first:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
now parallel
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
then it only takes a few ms to merge them back into one dataframe
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
I'm running 6 IPython engines on my MacBook, but you can see it drops the execute time down to 2s from 14s.
For really long running stochastic simulations I can use AWS backend by firing up a cluster with StarCluster. Much of the time, however, I parallelize just across 8 CPUs on my MBP.
This seems to work, although it really should be built in to pandas
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
By the way: this can not replace any groupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1
to the final pandas.concat()
call.
EDIT: the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels
case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together
case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
A short comment to accompany JD Long's answer. I've found that if the number of groups is very large (say hundreds of thousands), and your apply function is doing something fairly simple and quick, then breaking up your dataframe into chunks and assigning each chunk to a worker to carry out a groupby-apply (in serial) can be much faster than doing a parallel groupby-apply and having the workers read off a queue containing a multitude of groups. Example:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
So our dataframe looks like:
a
0 3425
1 1016
2 8141
3 9263
4 8018
Note that column 'a' has many groups (think customer ids):
len(df.a.unique())
15000
A function to operate on our groups:
def f1(group):
time.sleep(0.0001)
return group
Start a pool:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
Do a parallel groupby-apply:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
Let's now add a column which partitions the df into many fewer groups:
df['b'] = np.random.randint(0, 12, nrows)
Now instead of 15000 groups there are only 12:
len(df.b.unique())
12
We'll partition our df and do a groupby-apply on each chunk.
ppe = ProcessPoolExecutor(12)
Wrapper fun:
def f2(df):
df.groupby('a').apply(f1)
return df
Send out each chunk to be operated on in serial:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
Note that the amount of time spend per group has not changed. Rather what has changed is the length of the queue from which the workers read off of. I suspect that what is happening is that the workers cannot access the shared memory simultaneously, and are returning constantly to read off the queue, and are thus stepping on each others toes. With larger chunks to operate on, the workers return less frequently and so this problem is ameliorated and the overall execution is faster.