### In-mapper combiner pattern for MapReduce

A few days ago, I started reading a really interesting book called "Data-Intensive Text Processing with MapReduce" (free version is available here).[1] In it, the authors discuss some common patterns for writing MapReduce jobs.

One of these patterns is in-mapper combining. In its most basic form, this strategy is designed to save time by doing some aggregation within the mapper instead of emitting tons of (k, v) pairs which will bog everything down in the shuffle-and-sort phase.

Using the canonical word count example, Lin and Dyer give the basic MapReduce implementation:

We can implement this very quickly in Python using the mrjob package. First, let's get a corpus to work on. We'll use a plain text version of "Great Expectations" from Project Gutenberg.

In [1]:
!wget --output-document=data/great-expectations.txt http://www.gutenberg.org/cache/epub/1400/pg1400.txt

--2014-01-02 18:44:18--  http://www.gutenberg.org/cache/epub/1400/pg1400.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1033768 (1010K) [text/plain]
Saving to: ‘data/great-expectations.txt’

100%[======================================>] 1,033,768    357KB/s   in 2.8s

2014-01-02 18:44:22 (357 KB/s) - ‘data/great-expectations.txt’ saved [1033768/1033768]



We'll want to make everything lower case, remove punctuation, and for convenience change multiple whitespace characters to a single space. We could write Python for this but we may as well just save the effort and chain some quick command line tools.

In [2]:
!cat data/great-expectations.txt | tr [:upper:] [:lower:] | sed 's/[^a-z\n]/\ /g' | sed -e 's/\s\+/\ /g' > data/great-expectations-cleaned.txt


Here's what the file looks like cleaned up:

In [3]:
!sed -n '3000,3010p' data/great-expectations-cleaned.txt

had imitated from the heading of some newspaper and which i supposed
until she told me what it was to be a design for a buckle

of course there was a public house in the village and of course joe
liked sometimes to smoke his pipe there i had received strict orders
from my sister to call for him at the three jolly bargemen that
evening on my way from school and bring him home at my peril to the
three jolly bargemen therefore i directed my steps

there was a bar at the jolly bargemen with some alarmingly long chalk
scores in it on the wall at the side of the door which seemed to me to


We're ready to set up our MapReduce job. Here's a really simple version adapted straight from the mrjob documentation:

from mrjob.job import MRJob

class MRWordFreqCount(MRJob):
""" extend mrjob's base class and define mapper and reducer """

def mapper(self, _, line):
""" splits a line of text by whitespace and returns (word, 1) """
for word in line.strip().split():
yield word.lower(), 1

def reducer(self, word, counts):
""" sum the counts """
yield word, sum(counts)

if __name__ == '__main__':
MRWordFreqCount.run()


The problem with doing it the straight-forward way is that we will be emitting a (word, 1) pair for every single word in the entire document. As you can imagine, emitting more pairs from the mappers will make the shuffle-and-sort phase longer. Let's see how many pairs we'll emit in our Great Expectations example:

In [4]:
!wc -w data/great-expectations-cleaned.txt

192047 data/great-expectations-cleaned.txt


Here we're dealing with just under 200,000 words. That already sounds like a lot, but imagine how many more we'd be dealing with if instead of processing just one book we were trying to examine relative word counts in the entire Project Gutenberg corpus or the whole web! Even on a large Amazon EMR cluster, the shuffle-and-sort phase could take a while. For truly large jobs, we might even have to worry about running out of memory.

Lin and Dyer describe a common optimization for this scenario, the in-mapper combining pattern. Here, we count up all the occurences a of word within the mapper, and then emit pairs where the value has already been aggregated. Here's their pseudocode:

We'll build on our previous implentation, this time using a plain old Python defaultdict to implement our in-mapper combiner.

from mrjob.job import MRJob
from collections import defaultdict

class MRWordFreqCountInMapperCombiner(MRJob):

def mapper_init(self):
""" set up our temporary map from keys to values """
self.word_counts = defaultdict(int)

def mapper(self, _, line):
""" increment the appropriate words in our counter """
for word in line.strip().split():
self.word_counts[word] += 1

def mapper_final(self):
""" now emit all the (k, v) pairs we stored """
for word, value in self.word_counts.iteritems():
yield word, value

def reducer(self, word, counts):
""" sum the counts """
yield word, sum(counts)

if __name__ == '__main__':
MRWordFreqCountInMapperCombiner.run()


We'll actually save this to a file and run it from the command line (running it from an IPython notebook is a pain).

In [5]:
!python in_mapper_combine.py < data/great-expectations-cleaned.txt > data/output.txt

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /tmp/in_mapper_combine.isaac.20140102.234422.843666
writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper_part-00000
Counters from step 1:
(no counters found)
writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper-sorted
> sort /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper_part-00000
writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-reducer_part-00000
Counters from step 1:
(no counters found)
Moving /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-reducer_part-00000 -> /tmp/in_mapper_combine.isaac.20140102.234422.843666/output/part-00000
Streaming final output from /tmp/in_mapper_combine.isaac.20140102.234422.843666/output
removing tmp directory /tmp/in_mapper_combine.isaac.20140102.234422.843666


Here's what the results look like:

In [6]:
!head -n 20 data/output.txt

"a"	4113
"aback"	1
"abandoned"	2
"abased"	1
"abashed"	1
"abbey"	1
"abear"	2
"abel"	6
"aberdeen"	1
"aberration"	1
"abet"	1
"abeyance"	1
"abhorrence"	4
"abhorrent"	2
"abhorring"	1
"abide"	4
"abided"	1
"abilities"	3
"ability"	1
"abject"	3


One thing Lin and Dyer make sure to point out is that in-mapper combining is not a silver bullet:

There are, however, drawbacks to the in-mapper combining pattern. First, it breaks the functional programming underpinnings of MapReduce, since state is being preserved across multiple input key-value pairs. Ultimately, this isn't a big deal, since pragmatic concerns for efficiency often trump theoretical "purity", but there are practical consequences as well. Preserving state across multiple input instances means that algorithmic behavior may depend on the order in which input key-value pairs are encountered. This creates the potential for ordering-dependent bugs, which are difficult to debug on large datasets in the general case (although the correctness of in-mapper combining for word count is easy to demonstrate). Second, there is a fundamental scalability bottleneck associated with the in-mapper combining pattern. It critically depends on having sufficient memory to store intermediate results until the mapper has completely processed all key-value pairs in an input split. In the word count example, the memory footprint is bound by the vocabulary size, since it is theoretically possible that a mapper encounters every term in the collection. Heap's Law, a well-known result in information retrieval, accurately models the growth of vocabulary size as a function of the collection size—the somewhat surprising fact is that the vocabulary size never stops growing.

[1] Lin, Jimmy, and Chris Dyer. "Data-intensive text processing with MapReduce." Synthesis Lectures on Human Language Technologies 3.1 (2010): 1-177.

Any comments or suggestions? Let me know.