## Streaming Median

### May 29, 2012

We have seen heaps in several previous exercises; we’ll use the pairing-heaps algorithm, but any other heap algorithm would also work. We assume the operations `pq-empty`, `pq-empty?`, `pq-insert`, `pq-first`, and `pq-rest`.

We change the given algorithm slightly by making a predetermined choice where the original said “it doesn’t matter which,” always checking the left heap before the right heap. This means the main loop is initialized with the first element of the input sequence and enters its main processing loop with the second element of the input sequence:

```(define (streaming-medians xs)   (define (median left lcount right rcount)     (cond ((< lcount rcount) (pq-first right))           ((< rcount lcount) (pq-first left))           (else (/ (+ (pq-first left) (pq-first right)) 2))))   (if (null? xs) xs     (let loop ((xs (cdr xs))                (left (pq-insert < (car xs) pq-empty))                (lcount 1) (right pq-empty) (rcount 0)                (ms (list (car xs))))       (cond ((null? xs) (reverse ms))             ((< (car xs) (pq-first left))               (set! left (pq-insert > (car xs) left))               (set! lcount (+ lcount 1))               (when (< 1 (- lcount rcount))                 (set! right (pq-insert < (pq-first left) right))                 (set! left (pq-rest > left))                 (set! rcount (+ rcount 1))                 (set! lcount (- lcount 1)))               (loop (cdr xs) left lcount right rcount                 (cons (median left lcount right rcount) ms)))             (else               (set! right (pq-insert < (car xs) right))               (set! rcount (+ rcount 1))               (when (< 1 (- rcount lcount))                 (set! left (pq-insert > (pq-first right) left))                 (set! right (pq-rest < right))                 (set! lcount (+ lcount 1))                 (set! rcount (- rcount 1)))               (loop (cdr xs) left lcount right rcount                 (cons (median left lcount right rcount) ms)))))))```

This isn’t hard, just tedious. We used `set!` instead of `let` because it saves a couple of levels of indentation, and seems clearer. If you’re worried that this somehow makes the code non-functional, don’t; there are assignments on the inside, but each recursive call to `loop` is “referentially transparent” on the outside. Here’s an example:

```> (streaming-medians '(3 7 4 1 2 6 5)) (3 5 4 7/2 3 7/2 4)```

You can run the program at http://programmingpraxis.codepad.org/uPxttoPU.

Pages: 1 2

### 8 Responses to “Streaming Median”

1. Jussi Piitulainen said

Python’s heapq library implements min-heaps. I push and pop the smaller numbers in the negative, so that min and max switch their roles. Must be recent Python (possibly 3, which I use, or 2 with the proper incantation) for division to to work as intended.

```from heapq import heappush as push, heappop as pop

def medians(numbers):
numbers = iter(numbers)
less, more = [], []
first = next(numbers)
yield first
second = next(numbers)
push(less, - min(first, second))
push(more, max(first, second))
while True:
current = ( more if len(less) < len(more)
else - less if len(less) > len(more)
else (more - less) / 2 )
yield current
number = next(numbers)
if number <= current: push(less, - number)
else: push(more, number)
small, big = ( (less, more) if len(less) <= len(more)
else (more, less) )
if len(big) - len(small) > 1: push(small, - pop(big))

if __name__ == '__main__':
print(tuple(medians((3, 7, 4, 1, 2, 6, 5))))
# prints (3, 5.0, 4, 3.5, 3, 3.5, 4)
```

(I have been confused about the associativity of median of three in the past. It wasn’t suitable for this problem after all, so I followed the instructions quite humbly this time. No major blunders, hopefully :)

2. Mike said

Python 2.7

I keep the heaps so that they are the same size or the right heap is 1 element bigger than the left heap. That way, the median is the top of the right heap if the right heap is bigger; otherwise, it’s the average of the top of both heaps.

```from heapq import heappop, heappush

def streaming_median(seq):
seq = iter(seq)
left, right  = [], [next(seq)]
while True:
yield right if len(right) > len(left) else (right - left)/2.0

heappush(left, -heappushpop(right, next(seq)))
if len(left) > len(right):
heappush(right, -heappop(left))
```
3. Mike said

Sorry, the import line was wrong. Here’s a corrected listing:

```from heapq import heappop, heappush, heappushpop

def streaming_median(seq):
seq = iter(seq)
left, right  = [], [next(seq)]
while True:
yield right if len(right) > len(left) else (right - left)/2.0

heappush(left, -heappushpop(right, next(seq)))
if len(left) > len(right):
heappush(right, -heappop(left))
```
4. Jussi Piitulainen said

Mike, very nice. Here’s the same in Scheme, assuming mutating min-heap operations make-heap, size, least, push!, pop!. It applies a procedure to each consecutive median of a list. (Not tested. I have neither the heap operations nor the time.)

```(define	(for-each-median proc numbers)
(when (pair? numbers)
(proc (car numbers))
(when (pair?	(cdr numbers))
(let ((left (make-heap)) (right (make-heap)))
(for-each (lambda (number)
(proc (if (< (size left) (size right))
(least right)
(/ (-	(least right) (least left)) 2)))
(push! left (- (pop! right (push! right	number))))
(if (> (size left) (size right))
(push! right (- (pop! left)))))
(cddr numbers))))))
```

(define streaming-medians
(lambda (ls)
(define median
(lambda (ls)
(cond
[(= (length ls) 1) (car ls)]
[(= (length ls) 2) (/ (+ (car ls) (cadr ls)) 2)]
[else (median (cdr (reverse (cdr (reverse (sort! < ls))))))])))
(let ([sls '()] [med '()])
(do ([ls ls (cdr ls)])
((null? ls) (reverse med))
(set! sls (cons (car ls) sls))
(set! med (cons (median (list-copy sls)) med))))))

6. Mike said

It occured to me this morning, by unrolling the loop one iteration, the step of comparing the sizes of the heaps could be eliminated.

```from heapq import push, pushpop

def streaming_median(seq):
seq = iter(seq)
left, right = [], []

while True:
push(right, -pushpop(left, -next(seq)))
yield right

push(left, -pushpop(right, next(seq)))
yield (right - left) / 2.0

```

It might also be useful to have a function that is “fed” one element at a time from the input stream and that returns the median of all the elements that have been fed to the function so far. That can be done like so:

```from heap import push, pushpop

def smedian():
def gen():
left, right = [], [(yield)]
while True:
push(left,  -pushpop(right, (yield right)))
push(right, -pushpop(left, -(yield ((right - left)/2.0))))
g = gen()
next(g)
return g

# an example:
mediansofar = smedian()
for n in (3, 7, 4, 1, 2, 6, 5):
print mediansofar.send(n),

# output => 3 5.0 4 3.5 3 3.5 4
```
7. j2kun said

This is kind of an ill-posed (albeit clever) solution. If we were really dealing with a stream of numbers, it’d would be infeasible to store the entire stream in memory, and this solution requires us to store the entire history as we go along. Of course, if the process generating the numbers doesn’t change over time, then we can abort this computation after a while and be confident that our streaming median has converged to an appropriate approximation of the true median. But the point of a streaming median is to allow the median to change over time when the process generating the numbers changes.

Here is a more appropriate solution, which is used in practice and only requires O(1) space (the earliest reference I’ve seen to this method is in McFarlane, “Segmentation and tracking of piglets in images”). Keep a number m representing the current median. Perhaps initialize this to the median of the first few elements of the sequence, or alternatively the first element in the sequence, or some reasonable guess. Then when a new element x arrives, if x is greater than m you increment m by some fixed amount, and if x is less than m you decrement by that fixed amount. It’s clear that as more elements arrive, m will converge to the true median, up to the choice of the increment size.

I imagine one can add additional spot-checking techniques to update the increment dynamically, but in my applications I only work with integer sequences, so an increment size of 1 suffices. Also, as expected, this technique does not converge nearly as fast as the heap method, especially with a poor initial choice for m. On the other hand, it’s as fast as a streaming median algorithm can possibly be.

8. parag gupta said

http://www.spoj.pl/problems/WEIRDFN/
This would be a nice problem to try implementing using streaming medians.