MapReduce

October 6, 2009

MapReduce requires a dictionary of key/value pairs. We could use a hash table or treaps, but we choose a custom version of red-black trees, which we discussed in the previous exercise. The change is to the insert function, which must combine like items with the reducer rather than simply replace existing items with new ones; this is better than using the standard red-black tree insert function, because that would require a lookup to find out if the item already exists. Additionally, moving the red-black tree functions inside the map-reduce function eliminates the need to pass the lt? parameter to all the functions and avoids pollution of the namespace:

(define (map-reduce mapper reducer lt? items)
  (define (tree c k v l r) (vector c k v l r))
  (define empty (tree 'black 'nil 'nil 'nil 'nil))
  (define (empty? t) (eqv? t empty))
  (define (color t) (vector-ref t 0))
  (define (key t) (vector-ref t 1))
  (define (value t) (vector-ref t 2))
  (define (lkid t) (vector-ref t 3))
  (define (rkid t) (vector-ref t 4))
  (define (red? c) (eqv? c 'red))
  (define (black? c) (eqv? c 'black))
  (define (balance c k v l r)
    (cond ((and (black? c) (red? (color l)) (red? (color (lkid l))))
            (tree 'red (key l) (value l)
              (tree 'black (key (lkid l)) (value (lkid l))
                (lkid (lkid l)) (rkid (lkid l)))
              (tree 'black k v (rkid l) r)))
          ((and (black? c) (red? (color l)) (red? (color (rkid l))))
            (tree 'red (key (rkid l)) (value (rkid l))
              (tree 'black (key l) (value l) (lkid l) (lkid (rkid l)))
              (tree 'black k v (rkid (rkid l)) r)))
          ((and (black? c) (red? (color r)) (red? (color (lkid r))))
            (tree 'red (key (lkid r)) (value (lkid r))
              (tree 'black k v l (lkid (lkid r)))
              (tree 'black (key r) (value r) (rkid (lkid r)) (rkid r))))
          ((and (black? c) (red? (color r)) (red? (color (rkid r))))
            (tree 'red (key r) (value r)
              (tree 'black k v l (lkid r))
              (tree 'black (key (rkid r)) (value (rkid r))
                (lkid (rkid r)) (rkid (rkid r)))))
          (else (tree c k v l r))))
  (define (insert t k v)
    (define (ins t)
      (let ((tc (color t)) (tk (key t)) (tv (value t)) (tl (lkid t)) (tr (rkid t)))
        (cond ((empty? t) (tree 'red k v empty empty))
              ((lt? k tk) (balance tc tk tv (ins tl) tr))
              ((lt? tk k) (balance tc tk tv tl (ins tr)))
              (else (tree tc tk (reducer k tv v) tl tr)))))
    (let* ((z (ins t)) (zk (key z)) (zv (value z)) (zl (lkid z)) (zr (rkid z)))
      (tree 'black zk zv zl zr)))
  (define (enlist t base)
    (cond ((empty? t) base)
          ((and (empty? (lkid t)) (empty? (rkid t)))
            (cons (cons (key t) (value t)) base))
          (else (enlist (lkid t)
                        (cons (cons (key t) (value t))
                              (enlist (rkid t) base))))))
  (let loop ((items items) (t empty))
    (if (pair? items)
        (call-with-values
          (lambda () (mapper (car items)))
          (lambda (k v) (loop (cdr items) (insert t k v))))
        (enlist t '()))))

The named-let loop in the body of the function calls mapper on each item, inserts the resulting key/value pair in the red-black tree, then calls enlist to write the output when the input is exhausted. The reducer, as mentioned above, is in the insert function, which already distinguishes the new-item case from the update-item case.

In the statement of the exercise we mentioned three applications of map-reduce: frequency analysis, cross-referencing, and anagrams. Here is an example of frequency analysis:

> (map-reduce
    (lambda (x) (values x 1))
    (lambda (k v1 v2) (+ v1 v2))
    char<?
    (string->list "banana"))
((#\a . 3) (#\b . 1) (#\n . 2))

The xref function produces a cross-reference listing of a file:

(define (xref file)
  (with-input-from-file file
    (lambda ()
      (map-reduce
        (lambda (x) (values (car x) (list (cdr x))))
        (lambda (k v1 v2) (if (eq? (car v1) (car v2)) v1 (cons (car v2 v1))))
        string<?
        (get-words)))))

The mapping function forms a singleton list of each line number, and the reducing function conses additional line numbers onto the list. Since get-words returns the input words in reverse order, the line numbers for each input word are consed onto the output list in ascending order. Get-words reads the input, associating each word with the line number where it appears:

(define (get-words . port)
  (define (get-word p)
    (let loop ((c (peek-char p)) (rev-word '()))
      (cond ((eof-object? c) (if (pair? rev-word) (list->string (reverse rev-word)) c))
            ((char-in-word? c)
              (let ((x (read-char p))) (loop (peek-char p) (cons x rev-word))))
            ((pair? rev-word) (list->string (reverse rev-word)))
            ((char=? #\newline c) (read-char p) "")
            (else (read-char p) (loop (peek-char p) rev-word)))))
  (let ((p (if (null? port) (current-input-port) (car port))))
    (let loop ((w (get-word p)) (line 1) (word-list '()))
      (cond ((eof-object? w) word-list)
            ((string=? "" w) (loop (get-word p) (add1 line) word-list))
            (else (loop (get-word p) line (cons (cons w line) word-list)))))))

Applied to the text of the map-reduce function (counting the line (define (map-reduce ...)) as line 1), xref produces:

(("0" 5) ("1" 6) ("2" 7) ("3" 8) ("4" 9)
 ("and" 13 18 22 26 43) ("balance" 12 36 37)
 ("base" 41 42 44 47) ("black" 3 11 15 17 20 21 24 25 28 29 40)
 ("black?" 11 13 18 22 26) ("c" 2 10 11 12 13 18 22 26 31)
 ("call-with-values" 50) ("car" 51) ("cdr" 52)
 ("color" 5 13 18 22 26 34) ("cond" 13 35 42) ("cons" 44 46)
 ("define" 1 2 3 4 5 6 7 8 9 10 11 12 32 33 41)
 ("else" 31 38 45) ("empty" 3 4 35 48) ("empty?" 4 35 42 43)
 ("enlist" 41 45 47 53) ("eqv?" 4 10 11) ("if" 49)
 ("ins" 33 36 37 39) ("insert" 32 52) ("items" 1 48 49 51 52)
 ("k" 2 12 17 21 24 28 31 32 35 36 37 38 52)
 ("key" 6 14 15 19 20 23 25 27 29 34 39 44 46)
 ("l" 2 12 13 14 15 16 17 18 19 20 21 24 28 31)
 ("lambda" 51 52) ("let" 34 48) ("let*" 39)
 ("lkid" 8 13 15 16 20 22 23 24 25 28 30 34 39 43 45)
 ("loop" 48 52) ("lt?" 1 36 37) ("map-reduce" 1)
 ("mapper" 1 51) ("nil" 3) ("pair?" 49)
 ("r" 2 12 17 21 22 23 24 25 26 27 28 29 30 31)
 ("red" 10 14 19 23 27 35) ("red?" 10 13 18 22 26)
 ("reducer" 1 38)
 ("rkid" 9 16 17 18 19 20 21 25 26 29 30 34 39 43 47)
 ("t" 4 5 6 7 8 9 32 33 34 35 39 41 42 43 44 45 46 47 48 52 53)
 ("tc" 34 36 37 38) ("tk" 34 36 37 38) ("tl" 34 36 37 38)
 ("tr" 34 36 37 38)
 ("tree" 2 3 14 15 17 19 20 21 23 24 25 27 28 29 31 35 38 40)
 ("tv" 34 36 37 38) ("v" 2 12 17 21 24 28 31 32 35 38 52)
 ("value" 7 14 15 19 20 23 25 27 29 34 39 44 46)
 ("vector" 2) ("vector-ref" 5 6 7 8 9) ("z" 39)
 ("zk" 39 40) ("zl" 39 40) ("zr" 39 40) ("zv" 39 40))

The third example produces anagrams from a word list; the mapper signs each word by sorting its letters, and the reducer brings together words with equivalent signatures:

(define (anagrams words)
  (map cdr
    (map-reduce
      (lambda (x)
        (values
          (list->string (sort char<? (string->list x)))
          x))
      (lambda (k v1 v2) (string-append v1 " " v2))
      string<?
      words)))

For instance:

> (anagrams '("time" "stop" "pots" "cars" "emit"))
("cars" "time emit" "stop pots")

The file-based variant of map-reduce reuses all the red-black tree code above, and adds a few lines of code to permit the input to be read from a named file, a port, or, by default, the current input port:

(define (map-reduce-input reader mapper reducer lt? . pof)
  
… red-black tree functions above …
  (let* ((f? (and (pair? pof) (string? (car pof))))
         (p (cond (f? (open-input-file (car pof)))
                  ((pair? pof) (car pof))
                  (else (current-input-port)))))
    (let loop ((item (reader p)) (t empty))
      (if (eof-object? item)
          (begin (if f? (close-input-port p))
                 (enlist t '()))
          (call-with-values
            (lambda () (mapper item))
            (lambda (k v) (loop (reader p) (insert t k v))))))))

The anagrams function uses sort from the Standard Prelude. You can run the code at http://programmingpraxis.codepad.org/h8YcTvLO.

Pages: 1 2

2 Responses to “MapReduce”

  1. […] Praxis – MapReduce By Remco Niemeijer In today’s Programming Praxis exercise, we have to implement the famous MapReduce algorithm. Let’s get […]

  2. Remco Niemeijer said

    My Haskell solution (see http://bonsaicode.wordpress.com/2009/10/06/programming-praxis-mapreduce/ for a version with comments):

    mapReduce :: Ord k => (a -> (k, v)) -> (v -> v -> v) ->
                          (k -> k -> Bool) -> [a] -> [(k, v)]
    mapReduce m r lt = sortBy (\(a,_) (b,_) -> if lt a b then LT else GT) .
                       M.assocs . M.map (foldl1 r) .
                       M.fromListWith (++) . map (second return . m)
    
    mapReduceInput :: Ord k => (a -> (k, v)) -> (v -> v -> v) ->
        (k -> k -> Bool) -> (String -> [a]) -> FilePath -> IO [(k, v)]
    mapReduceInput m r lt g = fmap (mapReduce m r lt . g) . readFile
    

Leave a comment