Django: Duplicated logic between properties and queryset annotations

I don't think there is a silver bullet here. But I use this pattern in my projects for such cases.

class PickupTimeSlotAnnotatedManager(models.Manager):
    def with_nb_bookings(self):
        return self.annotate(
            _nb_bookings=Count(
                'order', filter=Q(order__status=Order.VALIDATED)
            )
        )

class PickupTimeSlot(models.Model):
    ...
    annotated = PickupTimeSlotAnnotatedManager()

    @property
    def nb_bookings(self) -> int:
        """ How many times this time slot is booked? """ 
        if hasattr(self, '_nb_bookings'):
            return self._nb_bookings
        return self.order_set.validated().count()

In code

qs = PickupTimeSlot.annotated.with_nb_bookings()
for item in qs:
    print(item.nb_bookings)

This way I can always use property, if it is part of annotated queryset it will use annotated value if not it will calculate it. This approach guaranties that I will have full control of when to make queryset "heavier" by annotating it with required values. If I don't need this I just use regular PickupTimeSlot.objects. ...

Also if there are many such properties you could write decorator that will wrap property and simplify code. It will work as cached_property decorator, but instead it will use annotated value if it is present.


TL;DR

  • Do you need to filter the "annotated field" results?

    • If Yes, "Keep" the manager and use it when required. In any other situation, use property logic
    • If No, remove the manager/annotation process and stick with property implementation, unless your table is small (~1000 entries) and not growing over the period.
  • The only advantage of annotation process I am seeing here is the filtering capability on the database level of the data


I have conducted a few tests to reach the conclusion, here they are

Environment

  • Django 3.0.7
  • Python 3.8
  • PostgreSQL 10.14

Model Structure

For the sake of simplicity and simulation, I am following the below model representation

class ReporterManager(models.Manager):
    def article_count_qs(self):
        return self.get_queryset().annotate(
            annotate_article_count=models.Count('articles__id', distinct=True))


class Reporter(models.Model):
    objects = models.Manager()
    counter_manager = ReporterManager()
    name = models.CharField(max_length=30)

    @property
    def article_count(self):
        return self.articles.distinct().count()

    def __str__(self):
        return self.name


class Article(models.Model):
    headline = models.CharField(max_length=100)
    reporter = models.ForeignKey(Reporter, on_delete=models.CASCADE,
                                 related_name="articles")

    def __str__(self):
        return self.headline

I have populated my database, both Reporter and Article model with random strings.

  • Reporter rows ~220K (220514)
  • Article rows ~1M (997311)

Test Cases

  1. Random picking of Reporter instance and retrieves the article count. We usually do this in the Detail View
  2. A paginated result. We slice the queryset and iterates over the sliced queryset.
  3. Filtering

I am using the %timeit-(ipython doc) command of Ipython shell to calculate the execution time

Test Case 1

For this, I have created these functions, which randomly pick instances from the database

import random

MAX_REPORTER = 220514


def test_manager_random_picking():
    pos = random.randint(1, MAX_REPORTER)
    return Reporter.counter_manager.article_count_qs()[pos].annotate_article_count


def test_property_random_picking():
    pos = random.randint(1, MAX_REPORTER)
    return Reporter.objects.all()[pos].article_count

Results

In [2]: %timeit test_manager_random_picking()
8.78 s ± 6.1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit test_property_random_picking()
6.36 ms ± 221 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Test Case 2

I have created another two functions,

import random

PAGINATE_SIZE = 50


def test_manager_paginate_iteration():
    start = random.randint(1, MAX_REPORTER - PAGINATE_SIZE)
    end = start + PAGINATE_SIZE
    qs = Reporter.counter_manager.article_count_qs()[start:end]
    for reporter in qs:
        reporter.annotate_article_count


def test_property_paginate_iteration():
    start = random.randint(1, MAX_REPORTER - PAGINATE_SIZE)
    end = start + PAGINATE_SIZE
    qs = Reporter.objects.all()[start:end]
    for reporter in qs:
        reporter.article_count

Results

In [8]: %timeit test_manager_paginate_iteration()
4.99 s ± 312 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [9]: %timeit test_property_paginate_iteration()
47 ms ± 1.16 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Test Case 3

undoubtedly, annotation is the only way here

Here you can see, the annotation process takes a huge amount of time as compared to the property implementation.


To avoid any duplication, one option could be:

  • remove the property in the Model
  • use a custom Manager
  • override it's get_queryset() method:
class PickupTimeSlotManager(models.Manager):

    def get_queryset(self):
        return super().get_queryset().annotate(
            db_nb_bookings=Count(
                'order', filter=Q(order__status=Order.VALIDATED)
            )
        )
from django.db import models
from .managers import PickupTimeSlotManager

class PickupTimeSlot(models.Model):
    ...
    # Add custom manager
    objects = PickupTimeSlotManager()

advantage: the calculated properties is transparently added to any queryset; no further action is required to use it

disadvantage: the computational overhead occurs even when the calculated property is not used