Efficient Rolling Max and Min Window

Here's one way to do it more efficiently. You still have to calculate the value occasionally but, other than certain degenerate data (ever decreasing values), that's minimised in this solution.

We'll limit ourselves to the maximum to simplify things but it's simple to extend to a minimum as well.

All you need is the following:

  • The window itself, initially empty.
  • The current maximum (max), initially any value.
  • The count of the current maximum (maxcount), initially zero.

The idea is to use max and maxcount as a cache for holding the current maximum. Where the cache is valid, you only need to return the value in it, a very fast constant-time operation.

If the cache is invalid when you ask for the maximum, it populates the cache and then returns that value. This is slower than the method in the previous paragraph but subsequent requests for the maximum once the cache is valid again use that faster method.

Here's what you do for maintaining the window and associated data:

  1. Get the next value N.

  2. If the window is full, remove the earliest entry M. If maxcount is greater than 0 and M is equal to max, decrement maxcount. Once maxcount reaches 0, the cache is invalid but we don't need to worry about that until such time the user requests the maximum value (there's no point repopulating the cache until then).

  3. Add N to the rolling window.

  4. If the window size is now 1 (that N is the only current entry), set max to N and maxcount to 1, then go back to step 1.

  5. If maxcount is greater than 0 and N is greater than max, set max to N and maxcount to 1, then go back to step 1.

  6. If maxcount is greater than 0 and N is equal to max, increment maxcount.

  7. Go back to step 1.

Now, at any point while that window management is going on, you may request the maximum value. This is a separate operation, distinct from the window management itself. This can be done using the following rules in sequence.

  1. If the window is empty, there is no maximum: raise an exception or return some sensible sentinel value.

  2. If maxcount is greater than 0, then the cache is valid: simply return max.

  3. Otherwise, the cache needs to be repopulated. Go through the entire list, setting up max and maxcount as per the code snippet below.


set max to window[0], maxcount to 0
for each x in window[]:
    if x > max:
        set max to x, maxcount to 1
    else:
        if x == max:
            increment maxcount

The fact that you mostly maintain a cache of the maximum value and only recalculate when needed makes this a much more efficient solution than simply recalculating blindly whenever an entry is added.

For some definite statistics, I created the following Python program. It uses a sliding window of size 25 and uses random numbers from 0 to 999 inclusive (you can play with these properties to see how they affect the outcome).

First some initialisation code. Note the stat variables, they'll be used to count cache hits and misses:

import random

window = []
max = 0
maxcount = 0
maxwin = 25

statCache = 0
statNonCache = 0

Then the function to add a number to the window, as per my description above:

def addNum(n):
    global window
    global max
    global maxcount
    if len(window) == maxwin:
        m = window[0]
        window = window[1:]
        if maxcount > 0 and m == max:
            maxcount = maxcount - 1

    window.append(n)

    if len(window) == 1:
        max = n
        maxcount = 1
        return

    if maxcount > 0 and n > max:
        max = n
        maxcount = 1
        return

    if maxcount > 0 and n == max:
        maxcount = maxcount + 1

Next, the code which returns the maximum value from the window:

def getMax():
    global max
    global maxcount
    global statCache
    global statNonCache

    if len(window) == 0:
        return None

    if maxcount > 0:
        statCache = statCache + 1
        return max

    max = window[0]
    maxcount = 0
    for val in window:
        if val > max:
            max = val
            maxcount = 1
        else:
            if val == max:
                maxcount = maxcount + 1
    statNonCache = statNonCache + 1

    return max

And, finally, the test harness:

random.seed()
for i in range(1000000):
    val = int(1000 * random.random())
    addNum(val)
    newmax = getMax()

print("%d cached, %d non-cached"%(statCache,statNonCache))

Note that the test harness attempts to get the maximum for every time you add a number to the window. In practice, this may not be needed. In other words, this is the worst-case scenario for the random data generated.


Running that program a few times for pseudo-statistical purposes, we get (formatted and analysed for reporting purposes):

 960579 cached,  39421 non-cached
 960373 cached,  39627 non-cached
 960395 cached,  39605 non-cached
 960348 cached,  39652 non-cached
 960441 cached,  39559 non-cached
 960602 cached,  39398 non-cached
 960561 cached,  39439 non-cached
 960463 cached,  39537 non-cached
 960409 cached,  39591 non-cached
 960798 cached,  39202 non-cached
=======         ======
9604969         395031

So you can see that, on average for random data, only about 3.95% of the cases resulted in a calculation hit (cache miss). The vast majority used the cached values. That should be substantially better than having to recalculate the maximum on every insertion into the window.

Some things that will affect that percentage will be:

  • The window size. Larger sizes means that there's more likelihood of a cache hit, improving the percentage. For example, doubling the window size pretty much halved the cache misses (to 1.95%).
  • The range of possible values. Less choice here means that there's more likely to be cache hits in the window. For example, reducing the range from 0..999 to 0..9 gave a big improvement in reducing cache misses (0.85%).

The algorithm you want to use is called the ascending minima (C++ implementation).

To do this in C#, you will want to get a double ended queue class, and a good one exists on NuGet under the name Nito.Deque.

I have written a quick C# implementation using Nito.Deque, but I have only briefly checked it, and did it from my head so it may be wrong!

public static class AscendingMinima
{
    private struct MinimaValue
    {
        public int RemoveIndex { get; set; }
        public double Value { get; set; }
    }

    public static double[] GetMin(this double[] input, int window)
    {
        var queue = new Deque<MinimaValue>();
        var result = new double[input.Length];

        for (int i = 0; i < input.Length; i++)
        {
            var val = input[i];

            // Note: in Nito.Deque, queue[0] is the front
            while (queue.Count > 0 && i >= queue[0].RemoveIndex)
                queue.RemoveFromFront();

            while (queue.Count > 0 && queue[queue.Count - 1].Value >= val)
                queue.RemoveFromBack();

            queue.AddToBack(new MinimaValue{RemoveIndex = i + window, Value = val });

            result[i] = queue[0].Value;
        }

        return result;
    }
}

Tags:

C#

Performance