Streaming Median

May 29, 2012

We have seen heaps in several previous exercises; we’ll use the pairing-heaps algorithm, but any other heap algorithm would also work. We assume the operations pq-empty, pq-empty?, pq-insert, pq-first, and pq-rest.

We change the given algorithm slightly by making a predetermined choice where the original said “it doesn’t matter which,” always checking the left heap before the right heap. This means the main loop is initialized with the first element of the input sequence and enters its main processing loop with the second element of the input sequence:

(define (streaming-medians xs)
  (define (median left lcount right rcount)
    (cond ((< lcount rcount) (pq-first right))
          ((< rcount lcount) (pq-first left))
          (else (/ (+ (pq-first left) (pq-first right)) 2))))
  (if (null? xs) xs
    (let loop ((xs (cdr xs))
               (left (pq-insert < (car xs) pq-empty))
               (lcount 1) (right pq-empty) (rcount 0)
               (ms (list (car xs))))
      (cond ((null? xs) (reverse ms))
            ((< (car xs) (pq-first left))
              (set! left (pq-insert > (car xs) left))
              (set! lcount (+ lcount 1))
              (when (< 1 (- lcount rcount))
                (set! right (pq-insert < (pq-first left) right))
                (set! left (pq-rest > left))
                (set! rcount (+ rcount 1))
                (set! lcount (- lcount 1)))
              (loop (cdr xs) left lcount right rcount
                (cons (median left lcount right rcount) ms)))
              (set! right (pq-insert < (car xs) right))
              (set! rcount (+ rcount 1))
              (when (< 1 (- rcount lcount))
                (set! left (pq-insert > (pq-first right) left))
                (set! right (pq-rest < right))
                (set! lcount (+ lcount 1))
                (set! rcount (- rcount 1)))
              (loop (cdr xs) left lcount right rcount
                (cons (median left lcount right rcount) ms)))))))

This isn’t hard, just tedious. We used set! instead of let because it saves a couple of levels of indentation, and seems clearer. If you’re worried that this somehow makes the code non-functional, don’t; there are assignments on the inside, but each recursive call to loop is “referentially transparent” on the outside. Here’s an example:

> (streaming-medians '(3 7 4 1 2 6 5))
(3 5 4 7/2 3 7/2 4)

You can run the program at


Pages: 1 2

11 Responses to “Streaming Median”

  1. Jussi Piitulainen said

    Python’s heapq library implements min-heaps. I push and pop the smaller numbers in the negative, so that min and max switch their roles. Must be recent Python (possibly 3, which I use, or 2 with the proper incantation) for division to to work as intended.

    from heapq import heappush as push, heappop as pop
    def medians(numbers):
        numbers = iter(numbers)
        less, more = [], []
        first = next(numbers)
        yield first
        second = next(numbers)
        push(less, - min(first, second))
        push(more, max(first, second))
        while True:
            current = ( more[0] if len(less) < len(more)
                        else - less[0] if len(less) > len(more)
                        else (more[0] - less[0]) / 2 )
            yield current
            number = next(numbers)
            if number <= current: push(less, - number)
            else: push(more, number)
            small, big = ( (less, more) if len(less) <= len(more)
                           else (more, less) )
            if len(big) - len(small) > 1: push(small, - pop(big))
    if __name__ == '__main__':
        print(tuple(medians((3, 7, 4, 1, 2, 6, 5))))
        # prints (3, 5.0, 4, 3.5, 3, 3.5, 4)                                        

    (I have been confused about the associativity of median of three in the past. It wasn’t suitable for this problem after all, so I followed the instructions quite humbly this time. No major blunders, hopefully :)

  2. Mike said

    Python 2.7

    I keep the heaps so that they are the same size or the right heap is 1 element bigger than the left heap. That way, the median is the top of the right heap if the right heap is bigger; otherwise, it’s the average of the top of both heaps.

    from heapq import heappop, heappush
    def streaming_median(seq):
        seq = iter(seq)
        left, right  = [], [next(seq)]
        while True:
            yield right[0] if len(right) > len(left) else (right[0] - left[0])/2.0
            heappush(left, -heappushpop(right, next(seq)))
            if len(left) > len(right):
                heappush(right, -heappop(left))
  3. Mike said

    Sorry, the import line was wrong. Here’s a corrected listing:

    from heapq import heappop, heappush, heappushpop
    def streaming_median(seq):
        seq = iter(seq)
        left, right  = [], [next(seq)]
        while True:
            yield right[0] if len(right) > len(left) else (right[0] - left[0])/2.0
            heappush(left, -heappushpop(right, next(seq)))
            if len(left) > len(right):
                heappush(right, -heappop(left))
  4. Jussi Piitulainen said

    Mike, very nice. Here’s the same in Scheme, assuming mutating min-heap operations make-heap, size, least, push!, pop!. It applies a procedure to each consecutive median of a list. (Not tested. I have neither the heap operations nor the time.)

    (define	(for-each-median proc numbers)
      (when (pair? numbers)
          (proc (car numbers))
          (when (pair?	(cdr numbers))
              (let ((left (make-heap)) (right (make-heap)))
                (push! right (cadr numbers))
                (for-each (lambda (number)
                    	(proc (if (< (size left) (size right))
                                      (least right)
                                      (/ (-	(least right) (least left)) 2)))
                    	(push! left (- (pop! right (push! right	number))))
                    	(if (> (size left) (size right))
                                (push! right (- (pop! left)))))
    	              (cddr numbers))))))
  5. radu said

    (define streaming-medians
    (lambda (ls)
    (define median
    (lambda (ls)
    [(= (length ls) 1) (car ls)]
    [(= (length ls) 2) (/ (+ (car ls) (cadr ls)) 2)]
    [else (median (cdr (reverse (cdr (reverse (sort! < ls))))))])))
    (let ([sls '()] [med '()])
    (do ([ls ls (cdr ls)])
    ((null? ls) (reverse med))
    (set! sls (cons (car ls) sls))
    (set! med (cons (median (list-copy sls)) med))))))

  6. Mike said

    It occured to me this morning, by unrolling the loop one iteration, the step of comparing the sizes of the heaps could be eliminated.

    from heapq import push, pushpop
    def streaming_median(seq):
        seq = iter(seq)
        left, right = [], []
        while True:
            push(right, -pushpop(left, -next(seq)))
            yield right[0]
            push(left, -pushpop(right, next(seq)))
            yield (right[0] - left[0]) / 2.0

    It might also be useful to have a function that is “fed” one element at a time from the input stream and that returns the median of all the elements that have been fed to the function so far. That can be done like so:

    from heap import push, pushpop
    def smedian():
        def gen():
            left, right = [], [(yield)]
            while True:
                push(left,  -pushpop(right, (yield right[0])))
                push(right, -pushpop(left, -(yield ((right[0] - left[0])/2.0))))
        g = gen()
        return g
    # an example:
    mediansofar = smedian()
    for n in (3, 7, 4, 1, 2, 6, 5):
            print mediansofar.send(n),
    # output => 3 5.0 4 3.5 3 3.5 4
  7. j2kun said

    This is kind of an ill-posed (albeit clever) solution. If we were really dealing with a stream of numbers, it’d would be infeasible to store the entire stream in memory, and this solution requires us to store the entire history as we go along. Of course, if the process generating the numbers doesn’t change over time, then we can abort this computation after a while and be confident that our streaming median has converged to an appropriate approximation of the true median. But the point of a streaming median is to allow the median to change over time when the process generating the numbers changes.

    Here is a more appropriate solution, which is used in practice and only requires O(1) space (the earliest reference I’ve seen to this method is in McFarlane, “Segmentation and tracking of piglets in images”). Keep a number m representing the current median. Perhaps initialize this to the median of the first few elements of the sequence, or alternatively the first element in the sequence, or some reasonable guess. Then when a new element x arrives, if x is greater than m you increment m by some fixed amount, and if x is less than m you decrement by that fixed amount. It’s clear that as more elements arrive, m will converge to the true median, up to the choice of the increment size.

    I imagine one can add additional spot-checking techniques to update the increment dynamically, but in my applications I only work with integer sequences, so an increment size of 1 suffices. Also, as expected, this technique does not converge nearly as fast as the heap method, especially with a poor initial choice for m. On the other hand, it’s as fast as a streaming median algorithm can possibly be.

  8. parag gupta said
    This would be a nice problem to try implementing using streaming medians.

  9. […] have previously studied algorithms for the streaming median and sliding median that calculate the median of a stream of numbers; the streaming median requires […]

  10. […] streaming median is computed using two heaps. All the numbers less than or equal to the current median are in the […]

  11. […] mediana de transmisión se calcula utilizando dos montones. Todos los números menores o iguales a la mediana actual están […]

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: