## Streaming Median

### May 29, 2012

The median of a set of numbers is the number in the middle when the items are arranged in sorted order, when the number of items is odd, or the average of the two numbers in the middle, when the number of items is even; for instance, the median of {3 7 4 1 2 6 5} is 4 and the median of {4 2 1 3} is 2.5. The normal algorithm for computing the median considers the entire set of numbers at once; the streaming median algorithm recalculates the median of each successive prefix of the set of numbers, and can be applied to a prefix of an infinite sequence. For instance, the streaming medians of the original sequence of numbers are 3, 5, 4, 3.5, 3, 3.5, and 4.

The streaming median is computed using two heaps. All the numbers less than or equal to the current median are in the left heap, which is arranged so that the maximum number is at the root of the heap. All the numbers greater than or equal to the current median are in the right heap, which is arranged so that the minimum number is at the root of the heap. Note that numbers equal to the current median can be in either heap. The count of numbers in the two heaps never differs by more than 1.

When the process begins the two heaps are initially empty. The first number in the input sequence is added to one of the heaps, it doesn’t matter which, and returned as the first streaming median. The second number in the input sequence is then added to the other heap, if the root of the right heap is less than the root of the left heap the two heaps are swapped, and the average of the two numbers is returned as the second streaming median.

Then the main algorithm begins. Each subsequent number in the input sequence is compared to the current median, and added to the left heap if it is less than the current median or to the right heap if it is greater than the current median; if the input number is equal to the current median, it is added to whichever heap has the smaller count, or to either heap arbitrarily if they have the same count. If that causes the counts of the two heaps to differ by more than 1, the root of the larger heap is removed and inserted in the smaller heap. Then the current median is computed as the root of the larger heap, if they differ in count, or the average of the roots of the two heaps, if they are the same size.

Your task is to write a function that computes the streaming medians of a sequence. When you are finished, you are welcome to read or run a suggested solution, or to post your own solution or discuss the exercise in the comments below.

Pages: 1 2

### 8 Responses to “Streaming Median”

1. Jussi Piitulainen said

Python’s heapq library implements min-heaps. I push and pop the smaller numbers in the negative, so that min and max switch their roles. Must be recent Python (possibly 3, which I use, or 2 with the proper incantation) for division to to work as intended.

```from heapq import heappush as push, heappop as pop

def medians(numbers):
numbers = iter(numbers)
less, more = [], []
first = next(numbers)
yield first
second = next(numbers)
push(less, - min(first, second))
push(more, max(first, second))
while True:
current = ( more[0] if len(less) < len(more)
else - less[0] if len(less) > len(more)
else (more[0] - less[0]) / 2 )
yield current
number = next(numbers)
if number <= current: push(less, - number)
else: push(more, number)
small, big = ( (less, more) if len(less) <= len(more)
else (more, less) )
if len(big) - len(small) > 1: push(small, - pop(big))

if __name__ == '__main__':
print(tuple(medians((3, 7, 4, 1, 2, 6, 5))))
# prints (3, 5.0, 4, 3.5, 3, 3.5, 4)
```

(I have been confused about the associativity of median of three in the past. It wasn’t suitable for this problem after all, so I followed the instructions quite humbly this time. No major blunders, hopefully :)

2. Mike said

Python 2.7

I keep the heaps so that they are the same size or the right heap is 1 element bigger than the left heap. That way, the median is the top of the right heap if the right heap is bigger; otherwise, it’s the average of the top of both heaps.

```from heapq import heappop, heappush

def streaming_median(seq):
seq = iter(seq)
left, right  = [], [next(seq)]
while True:
yield right[0] if len(right) > len(left) else (right[0] - left[0])/2.0

heappush(left, -heappushpop(right, next(seq)))
if len(left) > len(right):
heappush(right, -heappop(left))
```
3. Mike said

Sorry, the import line was wrong. Here’s a corrected listing:

```from heapq import heappop, heappush, heappushpop

def streaming_median(seq):
seq = iter(seq)
left, right  = [], [next(seq)]
while True:
yield right[0] if len(right) > len(left) else (right[0] - left[0])/2.0

heappush(left, -heappushpop(right, next(seq)))
if len(left) > len(right):
heappush(right, -heappop(left))
```
4. Jussi Piitulainen said

Mike, very nice. Here’s the same in Scheme, assuming mutating min-heap operations make-heap, size, least, push!, pop!. It applies a procedure to each consecutive median of a list. (Not tested. I have neither the heap operations nor the time.)

```(define	(for-each-median proc numbers)
(when (pair? numbers)
(proc (car numbers))
(when (pair?	(cdr numbers))
(let ((left (make-heap)) (right (make-heap)))
(for-each (lambda (number)
(proc (if (< (size left) (size right))
(least right)
(/ (-	(least right) (least left)) 2)))
(push! left (- (pop! right (push! right	number))))
(if (> (size left) (size right))
(push! right (- (pop! left)))))
(cddr numbers))))))
```

(define streaming-medians
(lambda (ls)
(define median
(lambda (ls)
(cond
[(= (length ls) 1) (car ls)]
[(= (length ls) 2) (/ (+ (car ls) (cadr ls)) 2)]
[else (median (cdr (reverse (cdr (reverse (sort! < ls))))))])))
(let ([sls '()] [med '()])
(do ([ls ls (cdr ls)])
((null? ls) (reverse med))
(set! sls (cons (car ls) sls))
(set! med (cons (median (list-copy sls)) med))))))

6. Mike said

It occured to me this morning, by unrolling the loop one iteration, the step of comparing the sizes of the heaps could be eliminated.

```from heapq import push, pushpop

def streaming_median(seq):
seq = iter(seq)
left, right = [], []

while True:
push(right, -pushpop(left, -next(seq)))
yield right[0]

push(left, -pushpop(right, next(seq)))
yield (right[0] - left[0]) / 2.0

```

It might also be useful to have a function that is “fed” one element at a time from the input stream and that returns the median of all the elements that have been fed to the function so far. That can be done like so:

```from heap import push, pushpop

def smedian():
def gen():
left, right = [], [(yield)]
while True:
push(left,  -pushpop(right, (yield right[0])))
push(right, -pushpop(left, -(yield ((right[0] - left[0])/2.0))))
g = gen()
next(g)
return g

# an example:
mediansofar = smedian()
for n in (3, 7, 4, 1, 2, 6, 5):
print mediansofar.send(n),

# output => 3 5.0 4 3.5 3 3.5 4
```
7. j2kun said

This is kind of an ill-posed (albeit clever) solution. If we were really dealing with a stream of numbers, it’d would be infeasible to store the entire stream in memory, and this solution requires us to store the entire history as we go along. Of course, if the process generating the numbers doesn’t change over time, then we can abort this computation after a while and be confident that our streaming median has converged to an appropriate approximation of the true median. But the point of a streaming median is to allow the median to change over time when the process generating the numbers changes.

Here is a more appropriate solution, which is used in practice and only requires O(1) space (the earliest reference I’ve seen to this method is in McFarlane, “Segmentation and tracking of piglets in images”). Keep a number m representing the current median. Perhaps initialize this to the median of the first few elements of the sequence, or alternatively the first element in the sequence, or some reasonable guess. Then when a new element x arrives, if x is greater than m you increment m by some fixed amount, and if x is less than m you decrement by that fixed amount. It’s clear that as more elements arrive, m will converge to the true median, up to the choice of the increment size.

I imagine one can add additional spot-checking techniques to update the increment dynamically, but in my applications I only work with integer sequences, so an increment size of 1 suffices. Also, as expected, this technique does not converge nearly as fast as the heap method, especially with a poor initial choice for m. On the other hand, it’s as fast as a streaming median algorithm can possibly be.

8. parag gupta said

http://www.spoj.pl/problems/WEIRDFN/
This would be a nice problem to try implementing using streaming medians.