Sliding Window Minimum

February 22, 2011

We start with a function that creates a random input list:

(define (make-sequence len symb)
  (map (lambda (x) (randint symb)) (make-list len #f)))

For instance, (make-sequence 25 10) will produce a list of 25 non-negative integers less than 10:

> (make-sequence 25 10)
(5 4 9 3 3 0 8 5 9 8 7 4 9 3 8 4 4 9 3 9 2 4 0 1 0)

The slow algorithm recomputes the minimum at each position of the window:

(define (slow-swm k vs)
  (let loop ((vs vs) (len (- (length vs) k -1)) (zs '()))
    (if (zero? len) (reverse zs)
      (loop (cdr vs) (- len 1)
        (cons (apply min (take k vs)) zs)))))

Here’s an example:

> (slow-swm 3 '(4 3 2 1 5 7 6 8 9))
(2 1 1 1 5 6 6)

Now we begin our look at the fast algorithm by computing the initial queue:

(define (right-most v vs)
  (let loop ((pos (- (length vs) 1)) (vs (reverse vs)))
    (if (= v (car vs)) pos
      (loop (- pos 1) (cdr vs)))))

We find the right-most value in the queue because the minimum remains alive as long as possible. The iteration continues until all the minima have been found in the initial window.

(define (init k vs)
  (let loop ((ws (take k vs)) (as '()))
    (if (null? ws) (reverse as)
      (let* ((m (apply min ws)) (j (right-most m ws)))
        (loop (drop (+ j 1) ws)
              (cons (list m (+ k k (- (length ws)) j)) as))))))

We are using Harter’s terminology: the input list is vs and the queue is as, and later the output list will be rs. The queue is updated using the three-step algorithm given above; the filter performs the first step, the append performs the second step, and the if performs the third step:

(define (update j k v as)
  (let ((new-as (append (filter (lambda (z) (< (car z) v)) as)
                        (list (list v (+ j k))))))
    (if (< j (cadar new-as)) new-as (cdr new-as))))

Now we are ready for the fast solution. At each step we add the current minimum to the growing solution with (cons (caar as) rs), then update the queue and loop until the input is empty, when we add the final minimum to the output before returning it:

(define (fast-swm k vs)
  (let loop ((j k) (vs (drop k vs)) (as (init k vs)) (rs '()))
    (if (null? vs) (reverse (cons (caar as) rs))
      (let ((new-as (update j k (car vs) as)))
        (loop (+ j 1) (cdr vs) new-as (cons (caar as) rs))))))

Here’s an example:

> (fast-swm 3 '(4 3 2 1 5 7 6 8 9))
(2 1 1 1 5 6 6)

This is a tricky little program, and deserves a proper test. The easiest way to do that is to create a bunch of random inputs and confirm that the two algorithms return the same output:

(define (swm-test n)
  (do ((i 0 (+ i 1))) ((= i n))
    (let ((vs (make-sequence (randint 100 1000) (randint 10 500)))
          (k (randint 5 50)))
      (assert (slow-swm k vs) (fast-swm k vs)))))

As always, no news is good news:

> (swm-test 1000)

We used take, drop, randint and assert from the Standard Prelude. You can run the program at http://programmingpraxis.codepad.org/zr59LnZk.

By the way, this past Saturday, February 19th, was the second anniversary of Programming Praxis. My thanks to all my readers, and especially to those who have posted their solutions in the comments. Your response gives me the energy to keep going.

Pages: 1 2

6 Responses to “Sliding Window Minimum”

  1. Graham said

    Congrats on two great years, and here’s to many more!

    #!/usr/bin/env python
    
    
    def obvious(k, xs):
        mins = []
        for i in xrange(len(xs) - k + 1):
            mins.append(min(xs[i:i + k]))
        return mins
    
    
    def _init_queue(k, xs):
        q = []
        for i in xrange(k):
            m = min(xs[i:k])
            d = xs.index(m, i, k) + k
            if q == [] or q[-1] != (m, d):
                q.append((m, d))
        return q
    
    
    def harter(k, xs):
        q = _init_queue(k, xs)
        mins = []
        for i in xrange(1, len(xs) - k + 2):
            mins.append(q[0][0])
            m = min(xs[i:i + k])
            d = xs.index(m, i, i + k) + k
            q = [p for p in q if p[0] <= m]
            if q != [] and q[0][1] > i + 1:
                q.pop(0)
            if q == [] or q[-1] != (m, d):
                q.append((m, d))
        return mins
    
    
    ### Added later: testing
    from random import randrange
    
    
    def comp_methods(k, xs):
        os = obvious(k, xs)
        hs = harter(k, xs)
        return all(map(lambda (o, h): o == h, zip(os, hs))) and len(os) == len(hs)
    
    
    def n_tests(n):
        for _ in xrange(n):
            l = randrange(10, 1001)
            k = randrange(5, l // 2 + 1)
            xs = [randrange(10, 1001) for _ in xrange(l)]
            if not comp_methods(k, xs):
                print k
                print xs
                print obvious(k, xs)
                print harter(k, xs)
                return False
        return True
    
    
    if __name__ == "__main__":
        xs = [4, 3, 2, 1, 5, 7, 6, 8, 9]
        print obvious(3, xs)
        print harter(3, xs)
        print n_tests(100)
    
  2. Lautaro Pecile said

    This is my own solution. May not be the better one.

    import itertools
    
    def take(n, it):
        return list(itertools.islice(it, n))
    
    def tails(it):
        while True:
            tail, it = itertools.tee(it)
            yield list(tail)
            next(it)
    
    def swm(it, k):
        result = [min(take(k, l)) for l in 
                    itertools.takewhile(lambda l: len(l) >= k, tails(it))]
        return result
    
  3. Mike said

    More python ;-)

    #
    # this is the 'obvious' approach
    #
    from collections import deque
    from itertools import imap, islice, tee, count, repeat
    
    def win_min_1(iterable, ws):
        seq = iter(iterable)
        d = deque(islice(seq, ws), ws)
        while True:
            yield min(d)
            d.append(seq.next())
    
    #
    # a variation on the 'obvious' approach using parallel iterators
    #
    def win_min_2(iterable, ws):
        return imap(min, *(imap(islice, tee(iterable,ws), count(), repeat(None))))
    
    #
    # the 'improved' algorithm
    #
    def win_min_3(iterable, ws):
        seq = enumerate(iterable)
        
        #initialize the queue
        q = [(v,n+ws) for n,v in islice(seq,ws-1)]
        q = q[q.index(min(q)):]
    
        
        for n,v in seq:
            # discard 'big' values from the end of the queue
            while q and q[-1][0] >= v:
                _ = q.pop(-1)
    
            q.append((v, n+ws))
    
            # discard a 'too old' value from the front of the queue
            if q[0][1] <= n:
                _ = q.pop(0)
                
            yield q[0][0]
    
    
  4. Richard said

    I was interested in this since I use a number of algorithms that operate on sliding windows. On my machine, when I compare the C code on the tiac.net site to a dumber algorithm, I beat it on random data for a variety of window sizes. What I’ve always done is track just the best minimum and death time. No queues/rings or O(k) storage needed. When the best value dies before a better value is found, you just fall back on the naive O(k) search for the next best value. On strictly ascending data this degrades all the way down to O(nk) performance, but the expected performance on a typical dataset is close to O(n)–especially if the window size is large. Larger windows are better because it’s more likely you won’t ever have k increasing values, meaning you won’t ever have to do the O(k) search.

    Here’s a replacement for that tiac.net C code… just thrown together to use his terminology, so hopefully I didn’t make any mistakes. Try it out if you like and see if your results improve or not in your favorite language:

    void
    minwindow(int *in, int *out, int n, int k)
    {
        int i;
        int death = k;
        out[0] = in[0];
    
        for (i=1;i<n;i++) {
            if(in[i] <= out[i-1]) {
               out[i] = in[i];
               death = i+k;
            } else if(death == i) {
                int j;
                out[i] = in[i-k+1];
                death = i+1;
                for(j = (i-k)+2; j <= i; ++j) {
                   if(in[j] <= out[i]) { 
                      out[i] = in[j]; 
                      death = j+k; 
                   }
                }
            } else {
               out[i] = out[i-1];
            }
        }
    }
    
  5. cosmin said

    Another Python solution using a deque:

    from collections import deque
    
    def min_sliding_window(a, k):
    	D = deque()
    	res, i = [], 0
    	for i in xrange(len(a)):
    		while D and D[-1][0] >= a[i]:
    			D.pop()
    		D.append((a[i], i+k-1))
    		if i >= k-1: res.append(D[0][0])
    		if i == D[0][1]: D.popleft()
    	return res
    
    print min_sliding_window([4, 3, 2, 1, 5, 7, 6, 8, 9], 3)
    
  6. brooknovak said

    This C# solution uses a binary min heap (see http://en.wikipedia.org/wiki/Binary_heap).
    I’ve omitted the heap implementation here.
    Performance O(k + (k-1)log(k-1) + (n-k) log k) = O(n log(n)).

    public static IEnumerable<int> GetMinimums(int[] numbers, int k) {
    	if (numbers == null)
    		throw new ArgumentNullException ("numbers");
    	if (k <= 0 || k > numbers.Length)
    		throw new ArgumentOutOfRangeException ("k");
    	if (k == 1) {
    		foreach (var n in numbers)
    			yield return n;
    		yield break;
    	}
     
            // O(k)
    	var firstWindowPart = new NumberItem[k - 1];
    	for (int i = 0; i < (k - 1); i++)
    		firstWindowPart [i] = new NumberItem { Index = i, Value = numbers[i] };
    
    	var minHeap = new BinHeap<NumberItem> (firstWindowPart); // O((k-1)log(k-1)) to heapify
    
            // O((n-k) log k) (bin tree inserts = log k)
    	for (int i = k - 1; i < numbers.Length; i++) {
    		minHeap.Insert (new NumberItem { Index = i, Value = numbers[i] });
    		while ((i - minHeap.Peek().Index) >= k) 
    			minHeap.Pop ();
    		yield return minHeap.Peek ().Value;
    	}
    }
    
    private class NumberItem : IComparable
    {
    	public int Value { get; set; }
    	public int Index { get; set; }
    	public int CompareTo(object other)
    	{
    		if (other == null) return 1; 
    		return Value.CompareTo(((NumberItem)other).Value);
    	}
    } 
    

Leave a comment