common celery class python example
Example: common celery class python example
import multiprocessingimport celeryimport osclass MyTask(celery.Task): ignore_result = False def __init__(self): # this is called in the main process. # initialize the cache. This empty cache will be # copied (by fork) to each worker: self.cache = {} # this array will be copied between workers, but the Queues # themselves are shared between the workers, i.e. all # workers referencing self.broadcast_queues[3] will be # referencing the *same* queue self.broadcast_queues = [(multiprocessing.Queue(), multiprocessing.Value(‘i’)) for _ in max_num_workers] self.queue_acquisition_lock = multiprocessing.Lock() self.my_queue = None # will be set by individual workers def run(self, user_id, arg, invalidate=False): if self.my_queue is None: self.acquire_queue() if invalidate: self.broadcast_invalidations(user_id, arg) raise self.Ignore() # before anything else, process outstanding invalidations: self.process_cache_invalidations() # now you can go ahead and calculate: return self.normal_operation(user_id, arg) def acquire_queue(self): with self.queue_acquisition_lock: # find an unused queue for q, registered_pid in self.broadcast_queues: if registered_pid.value is not None: # someone else got it already continue # great, a free one — acquire it # (could be a race condition here, if not for # the queue_acquisition_lock) self.my_queue = q # let others know that it’s mine now registered_pid.value = os.getpid() break else: raise Exception("Could not find a free broadcast queue, did you reserve enough of them?") def broadcast_invalidations(user_id, arg): # forward the message to others: for q, is_acquired in self.broadcast_queues: if is_acquired.value is None: # queue was reserved by main process, but nobody # registered it — don’t push into it, because # nobody will consume it continue q.push((user_id, arg)) def process_cache_invalidations(self): """ Process invalidation requests that other workers (and also this one) received since my last run.""" while not self.my_queue.empty(): # get the next item to invalidate: user_id, arg = self.my_queue.pop() # .. and remove it from the cache: self.cache.pop((user_id, arg), None) def normal_operation(self, user_id, arg): """ Whatever your task actually wants to do, based on parameters and on self.cache. """ if (user_id, arg) not in self.cache: self.cache[(user_id, arg)] = user_id * 3 + arg * 5 return self.cache[(user_id, arg)]