Pedro Alcocer

CV

Personal

Code


Implementing the in-mapper combiner pattern using Dumbo

Tuesday, May 03, 2011


In the MapReduce distributed computing paradigm, the use of combiners is essential to reducing the amount of overhead — especially disk I/O — associated with moving data from mappers to reducers by doing some intermediate key aggregation of mapper emissions. In this post, I describe a design pattern — in-mapper combining — that in many cases yields improved performance over combiners for pre-Reduce aggregation. I present an example of how to implement the pattern in the context of Dumbo, a Python API for writing MapReduce jobs to be run on Hadoop clusters.

Combiners and in-mapper combining

In Data-Intensive Text Processing with MapReduce, Lin and Dyer describe a design pattern, which they call “in-mapper combining”, for implementing combiner functionality within every map task. This pattern makes a discrete Combine step between Map and Reduce unnecessary. Typically, it is not guaranteed that a combiner function will be called on every mapper or that, if called, it will only be called once. This is a potential source of inefficiency in a MapReduce job. The in-mapper combiner design pattern guarantees that combiner-like key aggregation occurs in every mapper, instead of optionally in some mappers. The aggregation is done entirely in memory, without touching disk, and it happens before any emission code has been called, reducing overhead still.

Lin and Dyer mention some drawbacks of this pattern1 which, though important, I won’t go into here.

An example in pseudocode

For counting words, probably the simplest mapper function you could write is:

def mapper(document_id, document):
  for word in document:
    emit word, 1

In this case, the mapper emits one key-value pair for every word in a document. So, if a document consists of one fish, two fish, red fish, blue fish, the mapper will emit eight times.

one     1
fish    1
two     1
fish    1
red     1
fish    1
blue    1
fish    1

With the above mapper, the pair (fish, 1) is emitted four times. It would be more efficient if the four instances of fish could be aggregated before emission. This is what the in-mapper combiner does, which the pseudocode below illustrates.

class Mapper:
  def initialize():
    H = new AssociativeArray

  def map(document_id, document):
    for word in document:
      H[word] += 1

  def close():
    for word in H:
      emit(word, H[word])

The Mapper class is called once for every Map task. As the Mapper class is called, an associative array (dict in Python, or Map in Java) is created. The map method is called for every document in the dataset that is assigned to that Map task. As the mapper is fed documents, it keeps a tally of wordcounts in the associative array. Finally, at the end of the Map task, the close method is called and the words and their respective aggregate counts which are stored in the array are emitted.

one     1
fish    4
two     1
red     1
blue    1

Implementing in-mapper combining in Python with Dumbo

A problem with implementing this pattern using Dumbo is that, while it is clear how to write the initialize and map methods2, it is not clear how to write something like the close method and have Dumbo call it. Typically, one provides Dumbo with a callable mapper object (i.e., a function or a class that implements __call__) that takes a key and a value as arguments. Dumbo passes this callable object onto Hadoop Streaming, which calls it for every key-value pair until they run out. At that point the Reduce step begins3 (or the Combine step, if one is specified). At no point can a close method be called on the mapper object.

Just to be explicit about how Dumbo works according to the documentation, the “simplest” mapper function, above, would be written as:

def mapper(document_id, document): # key: document_id, value: document
  for word in document.split():
    yield word, 1

or, with a class, as:

class Mapper:
  def __init__(self):
    pass
  def __call__(self, document_id, document):
    for word in document.split():
      yield word, 1

It turns out, though, that Dumbo provides an ill-documented alternative interface to mapper/reducer functions that is enabled when the callable object in question is passed a single argument instead of the usual two. Inside the function, this argument is an iterable object that generates key-value tuples.

This is how the “simplest” mapper function looks when implemented using the alternative interface.

def mapper(data):
  for document_id, document in data:
    for word in document.split():
      yield word, 1

It is a subtle change, but this change is what allows us to (finally) implement the in-mapper combiner, seen below.

from collections import defaultdict

def mapper(data):
  
  # initialize
  H = defaultdict(int)

  # map
  for document_id, document in data:
    for term in doc.split():
      H[term] += 1
  
  # close
  for word in H:
    yield word, H[word]

This mapper function is only called once per Map task. Instead of calling the function with every key-value pair, key-value pairs are smuggled into the function through the lazy iterator data. This allows us to run code before and, crucially, after the mapper has consumed all the data it was going to get.

Specifically, we can set up a dict for storing words and their counts (initalize, in the pseudocode example), fill the dict with the iterator (map), and then emit what’s stored in the dict (close).


  1. For example, because aggregation happens in memory, it’s entirely possible to run out of memory on a Map node.

  2. Using __init__ and __call__, respectively.

  3. Technically, at that point, the sort, shuffle, and copy steps begin, but that’s not really relevant.