Reading files from disk in Python in Parallel
You still probably want to use multiprocessing, just structure it a bit differently:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])
pool = Pool()
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
result[i, :, :] = temp_array
pool.close()
pool.join()
What this does is first get a generator for the file names in filenames
. This means the file names are not stored in memory, but you can still loop over them. Next, it create a lambda function (equivalent to anonymous functions in matlab) that loads and reshapes a file (you could also use an ordinary function). Then it applies that function to each file name in using multiple processes, and puts the result in the overall array. Then it closes the processes.
This version uses some more idiomatic python. However, an approach that is more similar to your original one (although less idiomatic) might help you understand a bit better:
from multiprocessing import Pool
import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
def proccounter(counter):
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
return counter, temp_array
pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
result[counter,:,:] = temp_array
pool.close()
pool.join()
This just splits most of your for
loop into a function, applies that function to each element of the range using multiple processors, then puts the result into the array. It is basically just your original function with the for
loop split into two for
loops.
It can be done using joblib
library as follows:
def par_func(N1, N2, counter):
import numpy as np
t_str="%.4d" % counter
filename = 'file_'+t_str+'.dat'
temp_array = np.loadtxt(filename)
temp_array.shape=[N1,N2]
# temp_array = np.random.randn(N1, N2) # use this line to test
return temp_array
if __name__ == '__main__':
import numpy as np
N=1000
N1=200
N2=100
from joblib import Parallel, delayed
num_jobs = 2
output_list = Parallel(n_jobs=num_jobs)(delayed(par_func)
(N1, N2, counter)
for counter in range(N))
output_array = np.array(output_list)