A few days ago, I started reading a really interesting book called "Data-Intensive Text Processing with MapReduce" (free version is available here). In it, the authors discuss some common patterns for writing MapReduce jobs.
One of these patterns is in-mapper combining. In its most basic form, this strategy is designed to save time by doing some aggregation within the mapper instead of emitting tons of
(k, v) pairs which will bog everything down in the shuffle-and-sort phase.
Using the canonical word count example, Lin and Dyer give the basic MapReduce implementation:
!wget --output-document=data/great-expectations.txt http://www.gutenberg.org/cache/epub/1400/pg1400.txt
--2014-01-02 18:44:18-- http://www.gutenberg.org/cache/epub/1400/pg1400.txt Resolving www.gutenberg.org (www.gutenberg.org)... 184.108.40.206 Connecting to www.gutenberg.org (www.gutenberg.org)|220.127.116.11|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 1033768 (1010K) [text/plain] Saving to: ‘data/great-expectations.txt’ 100%[======================================>] 1,033,768 357KB/s in 2.8s 2014-01-02 18:44:22 (357 KB/s) - ‘data/great-expectations.txt’ saved [1033768/1033768]
We'll want to make everything lower case, remove punctuation, and for convenience change multiple whitespace characters to a single space. We could write Python for this but we may as well just save the effort and chain some quick command line tools.
!cat data/great-expectations.txt | tr [:upper:] [:lower:] | sed 's/[^a-z\n]/\ /g' | sed -e 's/\s\+/\ /g' > data/great-expectations-cleaned.txt
Here's what the file looks like cleaned up:
!sed -n '3000,3010p' data/great-expectations-cleaned.txt
had imitated from the heading of some newspaper and which i supposed until she told me what it was to be a design for a buckle of course there was a public house in the village and of course joe liked sometimes to smoke his pipe there i had received strict orders from my sister to call for him at the three jolly bargemen that evening on my way from school and bring him home at my peril to the three jolly bargemen therefore i directed my steps there was a bar at the jolly bargemen with some alarmingly long chalk scores in it on the wall at the side of the door which seemed to me to
We're ready to set up our MapReduce job. Here's a really simple version adapted straight from the
from mrjob.job import MRJob class MRWordFreqCount(MRJob): """ extend mrjob's base class and define mapper and reducer """ def mapper(self, _, line): """ splits a line of text by whitespace and returns (word, 1) """ for word in line.strip().split(): yield word.lower(), 1 def reducer(self, word, counts): """ sum the counts """ yield word, sum(counts) if __name__ == '__main__': MRWordFreqCount.run()
The problem with doing it the straight-forward way is that we will be emitting a
(word, 1) pair for every single word in the entire document. As you can imagine, emitting more pairs from the mappers will make the shuffle-and-sort phase longer. Let's see how many pairs we'll emit in our Great Expectations example:
!wc -w data/great-expectations-cleaned.txt
Here we're dealing with just under 200,000 words. That already sounds like a lot, but imagine how many more we'd be dealing with if instead of processing just one book we were trying to examine relative word counts in the entire Project Gutenberg corpus or the whole web! Even on a large Amazon EMR cluster, the shuffle-and-sort phase could take a while. For truly large jobs, we might even have to worry about running out of memory.
Lin and Dyer describe a common optimization for this scenario, the in-mapper combining pattern. Here, we count up all the occurences a of word within the mapper, and then emit pairs where the value has already been aggregated. Here's their pseudocode:
We'll build on our previous implentation, this time using a plain old Python
defaultdict to implement our in-mapper combiner.
from mrjob.job import MRJob from collections import defaultdict class MRWordFreqCountInMapperCombiner(MRJob): def mapper_init(self): """ set up our temporary map from keys to values """ self.word_counts = defaultdict(int) def mapper(self, _, line): """ increment the appropriate words in our counter """ for word in line.strip().split(): self.word_counts[word] += 1 def mapper_final(self): """ now emit all the (k, v) pairs we stored """ for word, value in self.word_counts.iteritems(): yield word, value def reducer(self, word, counts): """ sum the counts """ yield word, sum(counts) if __name__ == '__main__': MRWordFreqCountInMapperCombiner.run()
We'll actually save this to a file and run it from the command line (running it from an IPython notebook is a pain).
!python in_mapper_combine.py < data/great-expectations-cleaned.txt > data/output.txt
no configs found; falling back on auto-configuration no configs found; falling back on auto-configuration creating tmp directory /tmp/in_mapper_combine.isaac.20140102.234422.843666 reading from STDIN writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper_part-00000 Counters from step 1: (no counters found) writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper-sorted > sort /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-mapper_part-00000 writing to /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-reducer_part-00000 Counters from step 1: (no counters found) Moving /tmp/in_mapper_combine.isaac.20140102.234422.843666/step-0-reducer_part-00000 -> /tmp/in_mapper_combine.isaac.20140102.234422.843666/output/part-00000 Streaming final output from /tmp/in_mapper_combine.isaac.20140102.234422.843666/output removing tmp directory /tmp/in_mapper_combine.isaac.20140102.234422.843666
Here's what the results look like:
!head -n 20 data/output.txt
"a" 4113 "aback" 1 "abandoned" 2 "abased" 1 "abashed" 1 "abbey" 1 "abear" 2 "abel" 6 "aberdeen" 1 "aberration" 1 "abet" 1 "abeyance" 1 "abhorrence" 4 "abhorrent" 2 "abhorring" 1 "abide" 4 "abided" 1 "abilities" 3 "ability" 1 "abject" 3
One thing Lin and Dyer make sure to point out is that in-mapper combining is not a silver bullet:
There are, however, drawbacks to the in-mapper combining pattern. First, it breaks the functional programming underpinnings of MapReduce, since state is being preserved across multiple input key-value pairs. Ultimately, this isn't a big deal, since pragmatic concerns for efficiency often trump theoretical "purity", but there are practical consequences as well. Preserving state across multiple input instances means that algorithmic behavior may depend on the order in which input key-value pairs are encountered. This creates the potential for ordering-dependent bugs, which are difficult to debug on large datasets in the general case (although the correctness of in-mapper combining for word count is easy to demonstrate). Second, there is a fundamental scalability bottleneck associated with the in-mapper combining pattern. It critically depends on having sufficient memory to store intermediate results until the mapper has completely processed all key-value pairs in an input split. In the word count example, the memory footprint is bound by the vocabulary size, since it is theoretically possible that a mapper encounters every term in the collection. Heap's Law, a well-known result in information retrieval, accurately models the growth of vocabulary size as a function of the collection size—the somewhat surprising fact is that the vocabulary size never stops growing.
 Lin, Jimmy, and Chris Dyer. "Data-intensive text processing with MapReduce." Synthesis Lectures on Human Language Technologies 3.1 (2010): 1-177.