Sliding Window Minimum
February 22, 2011
We start with a function that creates a random input list:
(define (make-sequence len symb)
(map (lambda (x) (randint symb)) (make-list len #f)))
For instance, (make-sequence 25 10) will produce a list of 25 non-negative integers less than 10:
> (make-sequence 25 10)
(5 4 9 3 3 0 8 5 9 8 7 4 9 3 8 4 4 9 3 9 2 4 0 1 0)
The slow algorithm recomputes the minimum at each position of the window:
(define (slow-swm k vs)
(let loop ((vs vs) (len (- (length vs) k -1)) (zs '()))
(if (zero? len) (reverse zs)
(loop (cdr vs) (- len 1)
(cons (apply min (take k vs)) zs)))))
Here’s an example:
> (slow-swm 3 '(4 3 2 1 5 7 6 8 9))
(2 1 1 1 5 6 6)
Now we begin our look at the fast algorithm by computing the initial queue:
(define (right-most v vs)
(let loop ((pos (- (length vs) 1)) (vs (reverse vs)))
(if (= v (car vs)) pos
(loop (- pos 1) (cdr vs)))))
We find the right-most value in the queue because the minimum remains alive as long as possible. The iteration continues until all the minima have been found in the initial window.
(define (init k vs)
(let loop ((ws (take k vs)) (as '()))
(if (null? ws) (reverse as)
(let* ((m (apply min ws)) (j (right-most m ws)))
(loop (drop (+ j 1) ws)
(cons (list m (+ k k (- (length ws)) j)) as))))))
We are using Harter’s terminology: the input list is vs and the queue is as, and later the output list will be rs. The queue is updated using the three-step algorithm given above; the filter
performs the first step, the append
performs the second step, and the if
performs the third step:
(define (update j k v as)
(let ((new-as (append (filter (lambda (z) (< (car z) v)) as)
(list (list v (+ j k))))))
(if (< j (cadar new-as)) new-as (cdr new-as))))
Now we are ready for the fast solution. At each step we add the current minimum to the growing solution with (cons (caar as) rs)
, then update the queue and loop until the input is empty, when we add the final minimum to the output before returning it:
(define (fast-swm k vs)
(let loop ((j k) (vs (drop k vs)) (as (init k vs)) (rs '()))
(if (null? vs) (reverse (cons (caar as) rs))
(let ((new-as (update j k (car vs) as)))
(loop (+ j 1) (cdr vs) new-as (cons (caar as) rs))))))
Here’s an example:
> (fast-swm 3 '(4 3 2 1 5 7 6 8 9))
(2 1 1 1 5 6 6)
This is a tricky little program, and deserves a proper test. The easiest way to do that is to create a bunch of random inputs and confirm that the two algorithms return the same output:
(define (swm-test n)
(do ((i 0 (+ i 1))) ((= i n))
(let ((vs (make-sequence (randint 100 1000) (randint 10 500)))
(k (randint 5 50)))
(assert (slow-swm k vs) (fast-swm k vs)))))
As always, no news is good news:
> (swm-test 1000)
We used take
, drop
, randint
and assert
from the Standard Prelude. You can run the program at http://programmingpraxis.codepad.org/zr59LnZk.
By the way, this past Saturday, February 19th, was the second anniversary of Programming Praxis. My thanks to all my readers, and especially to those who have posted their solutions in the comments. Your response gives me the energy to keep going.
Congrats on two great years, and here’s to many more!
This is my own solution. May not be the better one.
More python ;-)
I was interested in this since I use a number of algorithms that operate on sliding windows. On my machine, when I compare the C code on the tiac.net site to a dumber algorithm, I beat it on random data for a variety of window sizes. What I’ve always done is track just the best minimum and death time. No queues/rings or O(k) storage needed. When the best value dies before a better value is found, you just fall back on the naive O(k) search for the next best value. On strictly ascending data this degrades all the way down to O(nk) performance, but the expected performance on a typical dataset is close to O(n)–especially if the window size is large. Larger windows are better because it’s more likely you won’t ever have k increasing values, meaning you won’t ever have to do the O(k) search.
Here’s a replacement for that tiac.net C code… just thrown together to use his terminology, so hopefully I didn’t make any mistakes. Try it out if you like and see if your results improve or not in your favorite language:
Another Python solution using a deque:
This C# solution uses a binary min heap (see http://en.wikipedia.org/wiki/Binary_heap).
I’ve omitted the heap implementation here.
Performance O(k + (k-1)log(k-1) + (n-k) log k) = O(n log(n)).