How to use Python and OpenCV with multiprocessing?
I don't know what types of filters you need, but if it's reasonably simple, you could consider libvips. It's an image processing system for very large images (larger than the amount of memory you have). It came out of a series of EU-funded scientific art imaging projects, so the focus is on the types of operation you need for image capture and comparison: convolution, rank, morphology, arithmetic, colour analysis, resampling, histograms, and so on.
It's fast (faster than OpenCV, on some benchmarks at least), needs little memory, and there's a high-level Python binding. It works on Linux, OS X and Windows. It handles all the multiprocessing for you automatically.
After reading some SO posts, I've come up with a way to use OpenCV
in Python3 with multiprocessing
. I recommend doing this on linux, because according to this post, spawned processes share memory with their parent as long as the content is not changed. Here's a minimal example:
import cv2
import multiprocessing as mp
import numpy as np
import psutil
img = cv2.imread('test.tiff', cv2.IMREAD_ANYDEPTH) # here I'm using a indexed 16-bit tiff as an example.
num_processes = 4
kernel_size = 11
tile_size = img.shape[0]/num_processes # Assuming img.shape[0] is divisible by 4 in this case
output = mp.Queue()
def mp_filter(x, output):
print(psutil.virtual_memory()) # monitor memory usage
output.put(x, cv2.GaussianBlur(img[img.shape[0]/num_processes*x:img.shape[0]/num_processes*(x+1), :],
(kernel_size, kernel_size), kernel_size/5))
# note that you actually have to process a slightly larger block and leave out the border.
if __name__ == 'main':
processes = [mp.Process(target=mp_filter, args=(x, output)) for x in range(num_processes)]
for p in processes:
p.start()
result = []
for ii in range(num_processes):
result.append(output.get(True))
for p in processes:
p.join()
Instead of using Queue
, another way to collect the result from the processes is to create a shared array through multiprocessing
module. (Has to import ctypes
)
result = mp.Array(ctypes.c_uint16, img.shape[0]*img.shape[1], lock = False)
Then each process can write to different portions of the array assuming there is no overlap. Creating a large mp.Array
is surprisingly slow, however. This actually defies the purpose of speeding up the operation. So use it only when the added time is not much when compared with total computation time. This array can be turned into a numpy array by :
result_np = np.frombuffer(result, dtypye=ctypes.c_uint16)
This can be done cleanly with Ray, which is a library for parallel and distributed Python. Ray reasons about "tasks" instead of using a fork-join model, which gives some additional flexibility (e.g., you an put values in shared memory even after forking worker processes), the same code runs on multiple machines, you can compose tasks together, etc.
import cv2
import numpy as np
import ray
num_tasks = 4
kernel_size = 11
@ray.remote
def mp_filter(image, i):
lower = image.shape[0] // num_tasks * i
upper = image.shape[0] // num_tasks * (i + 1)
return cv2.GaussianBlur(image[lower:upper, :],
(kernel_size, kernel_size), kernel_size // 5)
if __name__ == '__main__':
ray.init()
# Load the image and store it once in shared memory.
image = np.random.normal(size=(1000, 1000))
image_id = ray.put(image)
result_ids = [mp_filter.remote(image_id, i) for i in range(num_tasks)]
results = ray.get(result_ids)
Note that you can store more than just numpy arrays in shared memory, you can also benefit if you have Python objects that contain numpy arrays (like dictionaries containing numpy arrays). Under the hood, this uses the Plasma shared-memory object store and the Apache Arrow data layout.
You can read more in the Ray documentation. Note that I'm one of the Ray developers.