common celery class python example

Example: common celery class python example

import multiprocessingimport celeryimport osclass MyTask(celery.Task):    ignore_result = False    def __init__(self):        # this is called in the main process.        # initialize the cache. This empty cache will be         # copied (by fork) to each worker:        self.cache = {}        # this array will be copied between workers, but the Queues        # themselves are shared between the workers, i.e. all        # workers referencing self.broadcast_queues[3] will be         # referencing the *same* queue        self.broadcast_queues = [(multiprocessing.Queue(),                                  multiprocessing.Value(‘i’))                            for _ in max_num_workers]        self.queue_acquisition_lock = multiprocessing.Lock()        self.my_queue = None # will be set by individual workers    def run(self, user_id, arg, invalidate=False):        if self.my_queue is None:            self.acquire_queue()        if invalidate:            self.broadcast_invalidations(user_id, arg)            raise self.Ignore()        # before anything else, process outstanding invalidations:        self.process_cache_invalidations()        # now you can go ahead and calculate:        return self.normal_operation(user_id, arg)     def acquire_queue(self):         with self.queue_acquisition_lock:            # find an unused queue            for q, registered_pid in self.broadcast_queues:                if registered_pid.value is not None:                    # someone else got it already                    continue                # great, a free one — acquire it                # (could be a race condition here, if not for                # the queue_acquisition_lock)                self.my_queue = q                # let others know that it’s mine now                registered_pid.value = os.getpid()                break            else:                raise Exception("Could not find a free broadcast queue, did you reserve enough of them?")    def broadcast_invalidations(user_id, arg):        # forward the message to others:        for q, is_acquired in self.broadcast_queues:           if is_acquired.value is None:              # queue was reserved by main process, but nobody              # registered it — don’t push into it, because              # nobody will consume it              continue           q.push((user_id, arg))    def process_cache_invalidations(self):        """ Process invalidation requests that other workers        (and also this one) received since my last run."""        while not self.my_queue.empty():            # get the next item to invalidate:            user_id, arg = self.my_queue.pop()            # .. and remove it from the cache:            self.cache.pop((user_id, arg), None)    def normal_operation(self, user_id, arg):       """ Whatever your task actually wants to do, based on       parameters and on self.cache. """       if (user_id, arg) not in self.cache:           self.cache[(user_id, arg)] = user_id * 3 + arg * 5       return self.cache[(user_id, arg)]

Tags:

Misc Example