Analytics/Archive/Hadoop Streaming
This page will eventually become a really great document on how to use Hadoop streaming with Wikimedia data in Hadoop.
But, alas, for now it is a bunch of notes.
What is Hadoop Streaming
Sample Code
Count number of lines in SequenceFiles
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-input /wmf/data/raw/webrequest/webrequest_mobile/hourly/2014/10/01/00 \
-output /user/otto/tmp/streaming_test/out/12 \
-mapper 'wc -l' \
-reducer "awk '{s+=\$1} END {print s}'" \
-inputformat SequenceFileAsTextInputFormat
Word Count with Python
Mapper code. Needs to output key\tval for reducer to automatically process.
mapper.py
#!/usr/bin/env python
import sys
import re
# read lines from stding
for line in sys.stdin:
print(line)
# split the line by whitespace (spaces or tabs)
words = re.split(r'\s+', line)
# Print 'word\t1'. That is, the word, then a tab, then the number 1.
print('\n'.join(["{0}\t1".format(word) for word in words if word]))
Reducer code. Mapper output will be automatically sorted before being handed to the reducer, so all you gotta do is sum up the 1s! Note: Otto thinks this might not be correct. Hmmmmmm.
reducer.py
#!/usr/bin/env python
import sys
current_word = None
current_count = 0
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
if not line:
continue
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
count = int(count)
if current_word == word:
current_count += count
# else we've moved on to the next word,
# output the count we summed for this word.
else:
if current_word:
print('{0}\t{1}'.format(current_word, current_count))
# Move on to the next word.
current_word = word
current_count = count
# output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Test it out on the shell:
# Note that we need to sort the output of the mapper when testing in the shell. Sorting is done
# automatically during the Combiner phase of MapReduce in Hadoop.
echo 'the is a word that appears many times in english like this maybe the the the the the' | ./mapper.py | sort | ./reducer.py
a 1
appears 1
english 1
in 1
is 1
like 1
many 1
maybe 1
that 1
the 6
this 1
times 1
word 1
Now try it in Hadoop Streaming. You'll need to tell hadoop-streaming.jar to ship both mapper.py and reducer.py along with the MapReduce job it is going to create.
Put a file in HDFS to test this on. Your input data needs to be a directory, not a file.
$ cat defenestration.txt Defenestration is the act of throwing someone or something out of a window.[1] The term was coined around the time of an incident in Prague Castle in the year 1618. The word comes from the Latin de- (down or away from) and fenestra (window or opening).[2] Likewise, it can also refer to the condition of being thrown out of a window, as in "The Defenestration of Ermintrude Inch".[3] While the act of defenestration connotes the forcible or peremptory removal of an adversary, and the term is sometimes used in just that sense,[4] it also suggests breaking the windows in the process (de- also means removal). Although defenestrations can be fatal due to the height of the window through which a person is thrown or throws oneself or due to lacerations from broken glass, the act of defenestration need not carry the intent or result of death. $ hdfs dfs -mkdir /tmp/defenestration $ hdfs dfs -put defenestration.txt /tmp/defenestration/
Launch a Hadoop Streaming job:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-files mapper.py,reducer.py \
-input /tmp/defenestration \
-output /tmp/defenestration.wordcount \
-mapper mapper.py \
-reducer reducer.py
The output should be in /tmp/defenestration.wordcount
$ hdfs dfs -cat /tmp/defenestration.wordcount/part* "The 1 (de- 1 (down 1 (window 1 1618. 1 Although 1 Castle 1 Defenestration 2 Ermintrude 1 Inch".[3] 1 Latin 1 Likewise, 1 Prague 1 The 2 While 1 a 3 act 3 adversary, 1 also 3 an 2 and 2 around 1 as 1 away 1 be 1 being 1 breaking 1 broken 1 can 2 carry 1 coined 1 comes 1 condition 1 connotes 1 de- 1 death. 1 defenestration 2 defenestrations 1 due 2 fatal 1 fenestra 1 forcible 1 from 2 from) 1 glass, 1 height 1 in 5 incident 1 intent 1 is 3 it 2 just 1 lacerations 1 means 1 need 1 not 1 of 11 oneself 1 opening).[2] 1 or 7 out 2 peremptory 1 person 1 process 1 refer 1 removal 1 removal). 1 result 1 sense,[4] 1 someone 1 something 1 sometimes 1 suggests 1 term 2 that 1 the 14 through 1 throwing 1 thrown 2 throws 1 time 1 to 3 used 1 was 1 which 1 window 1 window, 1 window.[1] 1 windows 1 word 1 year 1
Shipping Python Modules
The above example didn't use anything other than Python scripts and standard Python libraries. Often, you will want to run Python code in Hadoop that uses custom modules not available on the Hadoop worker nodes. You can take advantage of Python's automatic zipimport module to zip up your module and ship it along with your streaming job with the -files option.
This example will use halfak's Mediawiki-Utilities Python module.
# clone the module from github:
git clone https://github.com/halfak/Mediawiki-Utilities.git mediawiki_utilities
# compress the mw directory into a zip file.
cd Mediawiki-Utilities && zip -r ../mediawiki_utilities.zip ./mw/ && cd ../
We'll use the xml_dump submodule to parse a xmldump of revision content and compute counts per contributor username.
Our mapper needs to output the username as the key and a 1 as the value for each page's revision.
revision_usernames_mapper.py
#!/usr/bin/env python3
import sys
# If there is a .zip file on the Python sys.path,
# the python importer will automatically use
# zipimporter to allow you to import from this module.
sys.path.append('mediawiki_utilities.zip')
from mw.xml_dump.iteration import Iterator
def revision_usernames(dump):
"""Yields each page's revision's username in the dump."""
for page in dump:
for revision in page:
yield(revision.contributor.user_text)
if __name__ == '__main__':
dump = Iterator.from_file(sys.stdin)
for username in revision_usernames(dump):
# Emit username \t 1
print('{0}\t1'.format(username))
Since all we are doing is counting keys, we can use the same reducer.py from the Python wordcount example above.
Unfortunately, this example will only work with a single mapper, because XML cannot be split and parsed correctly. We'll have to figure out something fancier soon...
# /user/otto/dump_small is a 10000 line test xml dump.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-D mapred.map.tasks=1 \
-files revision_usernames_mapper.py,reducer.py,mediawiki_utilities.zip \
-input /user/otto/dump_small \
-output /user/otto/tmp/dumps_username_count \
-mapper revision_usernames_mapper.py \
-reducer reducer.py
Using WikiHadoop to parse XML dumps
WikiHadoop is a custom InputFormat for Mediawiki xmldumps. Build it, and then provide it as a -libjars and -inputformat when you submit your hadoop streaming job.
In this example, we will still use Aaron's Mediawiki-Utilities to parse the data from WikiHadoop. However, since WikiHadoop returns individual page records without being wrapped in a full xml document, we'll need to wrap the individual page xml with the expected XML header and footer.
revision_usernames_wikihadoop_mapper.py
#!/usr/bin/env python3
import sys
import io
# If there is a .zip file on the Python sys.path,
# the python importer will automatically use
# zipimporter to allow you to import from this module.
sys.path.append('mediawiki_utilities.zip')
from mw.xml_dump.iteration import Iterator
metaXML = """
<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.5/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.5/ http://www.mediawiki.org/xml/export-0.5.xsd" version="0.5" xml:lang="en">
<siteinfo>
<sitename>Wikipedia</sitename>
<base>http://en.wikipedia.org/wiki/Main_Page</base>
<generator>MediaWiki 1.17wmf1</generator>
<case>first-letter</case>
<namespaces>
<namespace key="-2" case="first-letter">Media</namespace>
<namespace key="-1" case="first-letter">Special</namespace>
<namespace key="0" case="first-letter" />
<namespace key="1" case="first-letter">Talk</namespace>
<namespace key="2" case="first-letter">User</namespace>
<namespace key="3" case="first-letter">User talk</namespace>
<namespace key="4" case="first-letter">Wikipedia</namespace>
<namespace key="5" case="first-letter">Wikipedia talk</namespace>
<namespace key="6" case="first-letter">File</namespace>
<namespace key="7" case="first-letter">File talk</namespace>
<namespace key="8" case="first-letter">MediaWiki</namespace>
<namespace key="9" case="first-letter">MediaWiki talk</namespace>
<namespace key="10" case="first-letter">Template</namespace>
<namespace key="11" case="first-letter">Template talk</namespace>
<namespace key="12" case="first-letter">Help</namespace>
<namespace key="13" case="first-letter">Help talk</namespace>
<namespace key="14" case="first-letter">Category</namespace>
<namespace key="15" case="first-letter">Category talk</namespace>
<namespace key="100" case="first-letter">Portal</namespace>
<namespace key="101" case="first-letter">Portal talk</namespace>
<namespace key="108" case="first-letter">Book</namespace>
<namespace key="109" case="first-letter">Book talk</namespace>
</namespaces>
</siteinfo>
"""
def revision_usernames(dump):
"""Yields each page's revision's username in the dump."""
for page in dump:
for revision in page:
if (revision.contributor):
yield(revision.contributor.user_text)
else:
yield('_Unknown_')
if __name__ == '__main__':
xml_file = io.StringIO()
xml_file.write(metaXML + sys.stdin.read() + '</mediawiki>')
xml_file.seek(0)
dump = Iterator.from_file(xml_file)
for username in revision_usernames(dump):
# Emit username \t 1
print('{0}\t1'.format(username))
Submit the job:
# /user/otto/dump_small is a 10000 line test xml dump.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-libjars /srv/deployment/analytics/refinery/artifacts/wikihadoop.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-files revision_usernames_wikihadoop_mapper.py,reducer.py,mediawiki_utilities.zip \
-inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
-input /user/otto/dump_small/enwiki_dump.10000.xml.bz2 \
-output /user/otto/tmp/dumps_username_count.$(date +%s) \
-mapper revision_usernames_wikihadoop_mapper.py \
-reducer reducer.py