Pedro Alcocer

CV

Personal

Code


Code notes


Implementing the in-mapper combiner pattern using Dumbo

Tuesday, May 03, 2011

In the MapReduce distributed computing paradigm, the use of combiners is essential to reducing the amount of overhead — especially disk I/O — associated with moving data from mappers to reducers by doing some intermediate key aggregation of mapper emissions. In this post, I describe a design pattern — in-mapper combining — that in many cases yields improved performance over combiners for pre-Reduce aggregation. I present an example of how to implement the pattern in the context of Dumbo, a Python API for writing MapReduce jobs to be run on Hadoop clusters.

Combiners and in-mapper combining

In Data-Intensive Text Processing with MapReduce, Lin and Dyer describe a design pattern, which they call “in-mapper combining”, for implementing combiner functionality within every map task. This pattern makes a discrete Combine step between Map and Reduce unnecessary. Typically, it is not guaranteed that a combiner function will be called on every mapper or that, if called, it will only be called once. This is a potential source of inefficiency in a MapReduce job. The in-mapper combiner design pattern guarantees that combiner-like key aggregation occurs in every mapper, instead of optionally in some mappers. The aggregation is done entirely in memory, without touching disk, and it happens before any emission code has been called, reducing overhead still.

Lin and Dyer mention some drawbacks of this pattern1 which, though important, I won’t go into here.

An example in pseudocode

For counting words, probably the simplest mapper function you could write is:

def mapper(document_id, document):
  for word in document:
    emit word, 1

In this case, the mapper emits one key-value pair for every word in a document. So, if a document consists of one fish, two fish, red fish, blue fish, the mapper will emit eight times.

one     1
fish    1
two     1
fish    1
red     1
fish    1
blue    1
fish    1

With the above mapper, the pair (fish, 1) is emitted four times. It would be more efficient if the four instances of fish could be aggregated before emission. This is what the in-mapper combiner does, which the pseudocode below illustrates.

class Mapper:
  def initialize():
    H = new AssociativeArray

  def map(document_id, document):
    for word in document:
      H[word] += 1

  def close():
    for word in H:
      emit(word, H[word])

The Mapper class is called once for every Map task. As the Mapper class is called, an associative array (dict in Python, or Map in Java) is created. The map method is called for every document in the dataset that is assigned to that Map task. As the mapper is fed documents, it keeps a tally of wordcounts in the associative array. Finally, at the end of the Map task, the close method is called and the words and their respective aggregate counts which are stored in the array are emitted.

one     1
fish    4
two     1
red     1
blue    1

Implementing in-mapper combining in Python with Dumbo

A problem with implementing this pattern using Dumbo is that, while it is clear how to write the initialize and map methods2, it is not clear how to write something like the close method and have Dumbo call it. Typically, one provides Dumbo with a callable mapper object (i.e., a function or a class that implements __call__) that takes a key and a value as arguments. Dumbo passes this callable object onto Hadoop Streaming, which calls it for every key-value pair until they run out. At that point the Reduce step begins3 (or the Combine step, if one is specified). At no point can a close method be called on the mapper object.

Just to be explicit about how Dumbo works according to the documentation, the “simplest” mapper function, above, would be written as:

def mapper(document_id, document): # key: document_id, value: document
  for word in document.split():
    yield word, 1

or, with a class, as:

class Mapper:
  def __init__(self):
    pass
  def __call__(self, document_id, document):
    for word in document.split():
      yield word, 1

It turns out, though, that Dumbo provides an ill-documented alternative interface to mapper/reducer functions that is enabled when the callable object in question is passed a single argument instead of the usual two. Inside the function, this argument is an iterable object that generates key-value tuples.

This is how the “simplest” mapper function looks when implemented using the alternative interface.

def mapper(data):
  for document_id, document in data:
    for word in document.split():
      yield word, 1

It is a subtle change, but this change is what allows us to (finally) implement the in-mapper combiner, seen below.

from collections import defaultdict

def mapper(data):
  
  # initialize
  H = defaultdict(int)

  # map
  for document_id, document in data:
    for term in doc.split():
      H[term] += 1
  
  # close
  for word in H:
    yield word, H[word]

This mapper function is only called once per Map task. Instead of calling the function with every key-value pair, key-value pairs are smuggled into the function through the lazy iterator data. This allows us to run code before and, crucially, after the mapper has consumed all the data it was going to get.

Specifically, we can set up a dict for storing words and their counts (initalize, in the pseudocode example), fill the dict with the iterator (map), and then emit what’s stored in the dict (close).


  1. For example, because aggregation happens in memory, it’s entirely possible to run out of memory on a Map node.

  2. Using __init__ and __call__, respectively.

  3. Technically, at that point, the sort, shuffle, and copy steps begin, but that’s not really relevant.


Plotting MEG data with R and ggplot2

Friday, February 26, 2010

Here's a bit of R code for plotting the processed MEG data that Cephalo outputs. It uses the ggplot2 plotting package for easy, good-looking plots.

library(ggplot2)

# Download some data.
meg.data = read.csv("http://pealco.net/downloads/ws2010_meg.csv")

# Create a root mean square function.
rms = function(x) { sqrt(mean(x**2)) }

# Set up x and y aesthetic mappings.
meg.plot = ggplot(meg.data, aes(x = sample, y = amplitude))

# Apply a line geom, faceting, and a stat layer.
meg.plot + geom_line(aes(group=channel), color=alpha("black", 1/4)) +
           facet_grid(factor1 ~ factor0) +
           stat_summary(fun.y = "rms", color = "red", geom="line")

The result:


Repeated measures ANOVAs in R

Friday, January 30, 2009

Repeated measures ANOVAs in R are performed with the aov() function. The aov() function requires fully balanced data to work correctly, i.e., there can be no missing values in the data. In order to use aov() on data with missing values (most experimental data has excluded values), one needs to average over subjects or items.

Below, we assume a data frame data with at least the columns subject, factor1, factor2, and logRT. The code below averages over subjects, returning the data frame data.subject.

data.subject <- aggregate(data$logRT, list(data$subject, data$factor1, data$factor2), mean)
colnames(data.subject) <- c("subject", "factor1", "factor2", "logRT")

An alternative is to use the excellent reshape package.

library(reshape)
data.subject <- recast(data, SUBJ + factor1 + factor2 ~ variable, mean, measure.var="logRT")

Once you've created the data.subject data frame using one of the methods above, you can then perform the repeated measures ANOVA on data.subject.

summary(aov(logRT ~ factor1 * factor2 + Error(subject/(factor1 * factor2)), data=data.subject))

A by-items analysis is performed in the same way. You'd simply need to replace subject with item where appropriate.


This page gets a lot of search hits, so I'd like to make it as useful as possible. If you find this post useful or confusing, or have questions about it, or have suggestions on how to improve it, please make use of the comments.


Calculating d-prime in R

Friday, January 30, 2009

This is how to calculate d-prime in R. This function takes a data frame called data and returns a d-prime score for each unique entry in the subject column of the data frame. Note that the function will return Inf or -Inf if any value in Hrate or Frate is 1 or 0. This happens whenever someone responds correctly or incorrectly on all the trials.

dprime <- function(data) {
    yes <- subset(data, resp=="Y")
    no <- subset(data, resp=="N")
    hit <- subset(data, resp=="Y" & acc == 1)
    falsealarm <- subset(data, resp=="N" & acc == 0)
    
    Hrate <- xtabs(~subject, data=hit)/xtabs(~subject, data=yes)
    Frate <- xtabs(~subject, data=falsealarm)/xtabs(~subject, data=no)
    dprime_score <- qnorm(Hrate) - qnorm(Frate)
    
    return(dprime_score)
}

A simple usage example:

> data
   subject resp acc
1        1    Y   0
2        1    Y   1
3        1    Y   1
4        1    N   0
5        1    N   1
6        2    Y   1
7        2    N   1
8        2    Y   0
9        2    Y   1
10       2    N   0

> dprime(data)
subject
        1         2 
0.4307273 0.4307273