Sliding Window Minimum

February 22, 2011

The sliding window minimum problem takes a list of n numbers and a window size k and returns a list of the minimum values in each of the nk+1 successive windows. For instance, given the list {4, 3, 2, 1, 5, 7, 6, 8, 9} and a window of size 3, the desired output is the list {2, 1, 1, 1, 5, 6, 6}. Richard Harter discusses this problem at his blog, along with several different solutions.

The obvious solution is to report the minimum of the first k elements of the list, slide one position down the list, take the minimum of the first k elements starting at the new position, and so on, until there are less than k elements remaining.

Harter presents a better solution that he calls the ascending minima algorithm that requires O(n) time and O(k) space. He uses an auxiliary data structure, a queue, that is initialized as the minimum value of the initial window, followed by the minimum value of those items in the initial window that follow the right-most occurrence of the minimum value, followed by the minimum value of those items in the initial window that follow the right-most occurrence of the second value, and so on, up to a maximum of k ascending minimums; each minimum is paired with the index of the position where the minimum disappears from the window. For instance, with k=6 and the first six items of the input list {5, 2, 8, 6, 4, 7}, the queue will have three pairs (2 7), (4 10), and (7 11), indicating that 2 is the minimum value up to and including the 7th list element, then 4 is the minimum value for the 8th, 9th and 10th elements, and so on, unless a smaller item appears.

Once the queue of ascending minimums is initialized, output is produced by emitting the first value in the queue then updating the queue with the next item beyond the end of the current sliding window using the following three-step process:

  1. remove from the queue all items with value greater than the incoming item,
  2. append the incoming item to the end of the queue, along with its “death index,” and
  3. remove the head of the queue if it is beyond its death index.

Let’s look again at the list {4, 3, 2, 1, 5, 7, 6, 8, 9} with a window of size 3. The initial queue has the single entry (2 5), because the minimum item is the last element of the initial window (the largest possible queue arises when the window has monotonically increasing values); the queue entry indicates that the minimum value will be 2 until after the 5th item in the input, when that minimum value “dies.” The initial minimum 2 is output and the new item 1 is added to the queue; since 2 is greater than 1, the (2 5) entry is removed from the queue, a new (1 6) entry is added to the end of the queue, and since the current index 4 is less than the death index 6, we proceed to the next item. Now we output the current minimum 1, add an entry (5 7) to the queue, and move on. Again we output the current minimum 1, add an entry (6 8) to the queue, and move on. One more time we output the current minimum 1, and an entry (8 9) to the queue, and delete (1 6) from the head of the queue since the current index has reached its death index; the queue currently contains the items (5 7) (6 8) and (8 9). Going forward, we output 5 and delete the head of the queue, output 6, and output 6 again, at which point we stop because the current index has reached the end of the input.

Your task is to write two functions that solve the sliding window minimum problem, using the two algorithms described above. When you are finished, you are welcome to read or run a suggested solution, or to post your own solution or discuss the exercise in the comments below.

Pages: 1 2

6 Responses to “Sliding Window Minimum”

  1. Graham said

    Congrats on two great years, and here’s to many more!

    #!/usr/bin/env python
    
    
    def obvious(k, xs):
        mins = []
        for i in xrange(len(xs) - k + 1):
            mins.append(min(xs[i:i + k]))
        return mins
    
    
    def _init_queue(k, xs):
        q = []
        for i in xrange(k):
            m = min(xs[i:k])
            d = xs.index(m, i, k) + k
            if q == [] or q[-1] != (m, d):
                q.append((m, d))
        return q
    
    
    def harter(k, xs):
        q = _init_queue(k, xs)
        mins = []
        for i in xrange(1, len(xs) - k + 2):
            mins.append(q[0][0])
            m = min(xs[i:i + k])
            d = xs.index(m, i, i + k) + k
            q = [p for p in q if p[0] <= m]
            if q != [] and q[0][1] > i + 1:
                q.pop(0)
            if q == [] or q[-1] != (m, d):
                q.append((m, d))
        return mins
    
    
    ### Added later: testing
    from random import randrange
    
    
    def comp_methods(k, xs):
        os = obvious(k, xs)
        hs = harter(k, xs)
        return all(map(lambda (o, h): o == h, zip(os, hs))) and len(os) == len(hs)
    
    
    def n_tests(n):
        for _ in xrange(n):
            l = randrange(10, 1001)
            k = randrange(5, l // 2 + 1)
            xs = [randrange(10, 1001) for _ in xrange(l)]
            if not comp_methods(k, xs):
                print k
                print xs
                print obvious(k, xs)
                print harter(k, xs)
                return False
        return True
    
    
    if __name__ == "__main__":
        xs = [4, 3, 2, 1, 5, 7, 6, 8, 9]
        print obvious(3, xs)
        print harter(3, xs)
        print n_tests(100)
    
  2. Lautaro Pecile said

    This is my own solution. May not be the better one.

    import itertools
    
    def take(n, it):
        return list(itertools.islice(it, n))
    
    def tails(it):
        while True:
            tail, it = itertools.tee(it)
            yield list(tail)
            next(it)
    
    def swm(it, k):
        result = [min(take(k, l)) for l in 
                    itertools.takewhile(lambda l: len(l) >= k, tails(it))]
        return result
    
  3. Mike said

    More python ;-)

    #
    # this is the 'obvious' approach
    #
    from collections import deque
    from itertools import imap, islice, tee, count, repeat
    
    def win_min_1(iterable, ws):
        seq = iter(iterable)
        d = deque(islice(seq, ws), ws)
        while True:
            yield min(d)
            d.append(seq.next())
    
    #
    # a variation on the 'obvious' approach using parallel iterators
    #
    def win_min_2(iterable, ws):
        return imap(min, *(imap(islice, tee(iterable,ws), count(), repeat(None))))
    
    #
    # the 'improved' algorithm
    #
    def win_min_3(iterable, ws):
        seq = enumerate(iterable)
        
        #initialize the queue
        q = [(v,n+ws) for n,v in islice(seq,ws-1)]
        q = q[q.index(min(q)):]
    
        
        for n,v in seq:
            # discard 'big' values from the end of the queue
            while q and q[-1][0] >= v:
                _ = q.pop(-1)
    
            q.append((v, n+ws))
    
            # discard a 'too old' value from the front of the queue
            if q[0][1] <= n:
                _ = q.pop(0)
                
            yield q[0][0]
    
    
  4. Richard said

    I was interested in this since I use a number of algorithms that operate on sliding windows. On my machine, when I compare the C code on the tiac.net site to a dumber algorithm, I beat it on random data for a variety of window sizes. What I’ve always done is track just the best minimum and death time. No queues/rings or O(k) storage needed. When the best value dies before a better value is found, you just fall back on the naive O(k) search for the next best value. On strictly ascending data this degrades all the way down to O(nk) performance, but the expected performance on a typical dataset is close to O(n)–especially if the window size is large. Larger windows are better because it’s more likely you won’t ever have k increasing values, meaning you won’t ever have to do the O(k) search.

    Here’s a replacement for that tiac.net C code… just thrown together to use his terminology, so hopefully I didn’t make any mistakes. Try it out if you like and see if your results improve or not in your favorite language:

    void
    minwindow(int *in, int *out, int n, int k)
    {
        int i;
        int death = k;
        out[0] = in[0];
    
        for (i=1;i<n;i++) {
            if(in[i] <= out[i-1]) {
               out[i] = in[i];
               death = i+k;
            } else if(death == i) {
                int j;
                out[i] = in[i-k+1];
                death = i+1;
                for(j = (i-k)+2; j <= i; ++j) {
                   if(in[j] <= out[i]) { 
                      out[i] = in[j]; 
                      death = j+k; 
                   }
                }
            } else {
               out[i] = out[i-1];
            }
        }
    }
    
  5. cosmin said

    Another Python solution using a deque:

    from collections import deque
    
    def min_sliding_window(a, k):
    	D = deque()
    	res, i = [], 0
    	for i in xrange(len(a)):
    		while D and D[-1][0] >= a[i]:
    			D.pop()
    		D.append((a[i], i+k-1))
    		if i >= k-1: res.append(D[0][0])
    		if i == D[0][1]: D.popleft()
    	return res
    
    print min_sliding_window([4, 3, 2, 1, 5, 7, 6, 8, 9], 3)
    
  6. brooknovak said

    This C# solution uses a binary min heap (see http://en.wikipedia.org/wiki/Binary_heap).
    I’ve omitted the heap implementation here.
    Performance O(k + (k-1)log(k-1) + (n-k) log k) = O(n log(n)).

    public static IEnumerable<int> GetMinimums(int[] numbers, int k) {
    	if (numbers == null)
    		throw new ArgumentNullException ("numbers");
    	if (k <= 0 || k > numbers.Length)
    		throw new ArgumentOutOfRangeException ("k");
    	if (k == 1) {
    		foreach (var n in numbers)
    			yield return n;
    		yield break;
    	}
     
            // O(k)
    	var firstWindowPart = new NumberItem[k - 1];
    	for (int i = 0; i < (k - 1); i++)
    		firstWindowPart [i] = new NumberItem { Index = i, Value = numbers[i] };
    
    	var minHeap = new BinHeap<NumberItem> (firstWindowPart); // O((k-1)log(k-1)) to heapify
    
            // O((n-k) log k) (bin tree inserts = log k)
    	for (int i = k - 1; i < numbers.Length; i++) {
    		minHeap.Insert (new NumberItem { Index = i, Value = numbers[i] });
    		while ((i - minHeap.Peek().Index) >= k) 
    			minHeap.Pop ();
    		yield return minHeap.Peek ().Value;
    	}
    }
    
    private class NumberItem : IComparable
    {
    	public int Value { get; set; }
    	public int Index { get; set; }
    	public int CompareTo(object other)
    	{
    		if (other == null) return 1; 
    		return Value.CompareTo(((NumberItem)other).Value);
    	}
    } 
    

Leave a comment