In this tutorial I will describe how to write a simple MapReduce program for Hadoop in the Python programming linguistic communication.

  • Motivation
  • What we want to do
  • Prerequisites
  • Python MapReduce Code
    • Map step: mapper.py
    • Reduce footstep: reducer.py
    • Examination your code (true cat data | map | sort | reduce)
  • Running the Python Code on Hadoop
    • Download example input data
    • Copy local example data to HDFS
    • Run the MapReduce job
  • Improved Mapper and Reducer code: using Python iterators and generators
    • mapper.py
    • reducer.py
  • Related Links

Motivation

Fifty-fifty though the Hadoop framework is written in Coffee, programs for Hadoop need non to exist coded in Coffee but can too be developed in other languages like Python or C++ (the latter since version 0.14.i). Even so, Hadoop'south documentation and the most prominent Python example on the Hadoop website could make you call up that you lot must interpret your Python code using Jython into a Java jar file. Plain, this is not very convenient and can even be problematic if y'all depend on Python features not provided by Jython. Some other event of the Jython approach is the overhead of writing your Python plan in such a way that information technology tin can interact with Hadoop – simply accept a expect at the case in $HADOOP_HOME/src/examples/python/WordCount.py and you see what I mean.

That said, the footing is at present prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more Pythonic manner, i.eastward. in a way you should be familiar with.

What nosotros want to do

We will write a elementary MapReduce program (run across also the MapReduce commodity on Wikipedia) for Hadoop in Python just without using Jython to translate our lawmaking to Java jar files.

Our program will mimick the WordCount, i.e. information technology reads text files and counts how ofttimes words occur. The input is text files and the output is text files, each line of which contains a discussion and the count of how often it occured, separated by a tab.

Annotation: You can also use programming languages other than Python such as Perl or Ruby with the "technique" described in this tutorial.

Prerequisites

Y'all should have an Hadoop cluster up and running because we will go our easily muddy. If you don't have a cluster yet, my post-obit tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux only the information does also apply to other Linux/Unix variants.

  • Running Hadoop On Ubuntu Linux (Single-Node Cluster) – How to set up apseudo-distributed, unmarried-node Hadoop cluster backed by the Hadoop Distributed File Organization (HDFS)
  • Running Hadoop On Ubuntu Linux (Multi-Node Cluster) – How to ready upward adistributed, multi-node Hadoop cluster backed by the Hadoop Distributed File System (HDFS)

Python MapReduce Code

The "trick" backside the following Python code is that nosotros will use the Hadoop Streaming API (see also the corresponding wiki entry) for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). Nosotros volition simply use Python'ssys.stdin to read input data and print our ain output tosys.stdout. That's all we need to do because Hadoop Streaming volition take intendance of everything else!

Map step: mapper.py

Save the following code in the file/home/hduser/mapper.py. Information technology will read information from STDIN, carve up it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT. The Map script will not compute an (intermediate) sum of a discussion'southward occurrences though. Instead, it volition output <word> i tuples immediately – fifty-fifty though a specific word might occur multiple times in the input. In our instance we let the subsequent Reduce footstep practice the last sum count. Of course, y'all tin can change this behavior in your own scripts as you lot delight, but we will keep it like that in this tutorial considering of didactic reasons. :-)

Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you volition meet bug.

                                  #!/usr/bin/env python                                    """mapper.py"""                  import                  sys                  # input comes from STDIN (standard input)                                    for                  line                  in                  sys                  .                  stdin                  :                  # remove leading and trailing whitespace                                    line                  =                  line                  .                  strip                  ()                  # split the line into words                                    words                  =                  line                  .                  separate                  ()                  # increment counters                                    for                  discussion                  in                  words                  :                  # write the results to STDOUT (standard output);                                    # what we output here will be the input for the                                    # Reduce step, i.east. the input for reducer.py                                    #                                    # tab-delimited; the trivial word count is 1                                    impress                  '%south                  \t                  %due south'                  %                  (                  word                  ,                  one                  )                              

Reduce pace: reducer.py

Save the following code in the file/home/hduser/reducer.py. It will read the results ofmapper.py from STDIN (so the output format of mapper.py and the expected input format of reducer.py must friction match) and sum the occurrences of each word to a final count, then output its results to STDOUT.

Make sure the file has execution permission (chmod +10 /home/hduser/reducer.py should do the trick) or you will meet problems.

                                  #!/usr/bin/env python                                    """reducer.py"""                  from                  operator                  import                  itemgetter                  import                  sys                  current_word                  =                  None                  current_count                  =                  0                  word                  =                  None                  # input comes from STDIN                                    for                  line                  in                  sys                  .                  stdin                  :                  # remove leading and trailing whitespace                                    line                  =                  line                  .                  strip                  ()                  # parse the input we got from mapper.py                                    give-and-take                  ,                  count                  =                  line                  .                  split                  (                  '                  \t                  '                  ,                  1                  )                  # convert count (currently a string) to int                                    try                  :                  count                  =                  int                  (                  count                  )                  except                  ValueError                  :                  # count was not a number, so silently                                    # ignore/discard this line                                    continue                  # this IF-switch but works considering Hadoop sorts map output                                    # by key (here: discussion) before it is passed to the reducer                                    if                  current_word                  ==                  word                  :                  current_count                  +=                  count                  else                  :                  if                  current_word                  :                  # write result to STDOUT                                    print                  '%southward                  \t                  %s'                  %                  (                  current_word                  ,                  current_count                  )                  current_count                  =                  count                  current_word                  =                  word                  # do not forget to output the last word if needed!                                    if                  current_word                  ==                  give-and-take                  :                  print                  '%s                  \t                  %s'                  %                  (                  current_word                  ,                  current_count                  )                              

Test your code (cat data | map | sort | reduce)

I recommend to exam yourmapper.py andreducer.py scripts locally earlier using them in a MapReduce chore. Otherwise your jobs might successfully consummate only there will be no task result data at all or not the results you lot would accept expected. If that happens, most likely it was you (or me) who screwed up.

Here are some ideas on how to examination the functionality of the Map and Reduce scripts.

                                  # Exam mapper.py and reducer.py locally outset                  # very basic test                  hduser@ubuntu:~$                                    echo                  "foo foo quux labs foo bar quux"                  | /home/hduser/mapper.py foo     i foo     1 quux    1 labs    1 foo     i bar     1 quux    1  hduser@ubuntu:~$                                    echo                  "foo foo quux labs foo bar quux"                  | /home/hduser/mapper.py |                  sort                  -k1,i | /abode/hduser/reducer.py bar     1 foo     3 labs    1 quux    2                  # using one of the ebooks as instance input                  # (see below on where to get the ebooks)                  hduser@ubuntu:~$                                    cat                  /tmp/gutenberg/20417-eight.txt | /home/hduser/mapper.py  The     1  Projection i  Gutenberg       1  EBook   ane  of      1                  [...]                  (you lot become the thought)                              

Running the Python Lawmaking on Hadoop

Download example input data

We will utilise three ebooks from Projection Gutenberg for this case:

  • The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
  • The Notebooks of Leonardo Da Vinci
  • Ulysses by James Joyce

Download each ebook as text files in Plain Text UTF-8 encoding and shop the files in a local temporary directory of choice, for example /tmp/gutenberg.

                hduser@ubuntu:~$                                    ls                  -l                  /tmp/gutenberg/ total 3604                  -rw-r--r--                  1 hduser hadoop  674566 February  3 x:17 pg20417.txt                  -rw-r--r--                  one hduser hadoop 1573112 Feb  3 10:18 pg4300.txt                  -rw-r--r--                  1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt hduser@ubuntu:~$                              

Copy local instance information to HDFS

Before we run the bodily MapReduce job, nosotros must outset re-create the files from our local file system to Hadoop's HDFS.

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop dfs                  -copyFromLocal                  /tmp/gutenberg /user/hduser/gutenberg hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop dfs                  -ls                  Institute 1 items drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:twoscore /user/hduser/gutenberg hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop dfs                  -ls                  /user/hduser/gutenberg Establish 3 items                  -rw-r--r--                  3 hduser supergroup     674566 2011-03-x eleven:38 /user/hduser/gutenberg/pg20417.txt                  -rw-r--r--                  three hduser supergroup    1573112 2011-03-ten 11:38 /user/hduser/gutenberg/pg4300.txt                  -rw-r--r--                  iii hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt hduser@ubuntu:/usr/local/hadoop$                              

Run the MapReduce chore

At present that everything is prepared, nosotros tin finally run our Python MapReduce task on the Hadoop cluster. Every bit I said above, we leverage the Hadoop Streaming API for helping usa passing data between our Map and Reduce lawmaking via STDIN and STDOUT.

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar                  \                  -file                  /home/hduser/mapper.py                  -mapper                  /abode/hduser/mapper.py                  \                  -file                  /abode/hduser/reducer.py                  -reducer                  /habitation/hduser/reducer.py                  \                  -input                  /user/hduser/gutenberg/*                  -output                  /user/hduser/gutenberg-output                              

If you want to modify some Hadoop settings on the wing like increasing the number of Reduce tasks, you can use the -D option:

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar                  -D                  mapred.reduce.tasks=xvi ...                              

Note about

mapred.map.tasks

: Hadoop does not honour mapred.map.tasks beyond considering it a hint. But it accepts the user specified

mapred.reduce.tasks

and doesn't manipulate that. You cannot force

mapred.map.tasks

merely can specify

mapred.reduce.tasks

.

The job will read all the files in the HDFS directory/user/hduser/gutenberg, process it, and store the results in the HDFS directory/user/hduser/gutenberg-output. In full general Hadoop will create one output file per reducer; in our case however information technology volition only create a single file because the input files are very small.

Example output of the previous command in the console:

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar                  -mapper                  /home/hduser/mapper.py                  -reducer                  /abode/hduser/reducer.py                  -input                  /user/hduser/gutenberg/*                  -output                  /user/hduser/gutenberg-output  additionalConfSpec_:null                  null                  =@@@userJobConfProps_.get(stream.shipped.hadoopstreaming  packageJobJar:                  [/app/hadoop/tmp/hadoop-unjar54543/]                  []                  /tmp/streamjob54544.jar                  tmpDir                  =null                  [...] INFO mapred.FileInputFormat: Total input paths to procedure : vii                  [...] INFO streaming.StreamJob: getLocalDirs():                  [/app/hadoop/tmp/mapred/local]                  [...] INFO streaming.StreamJob: Running job: job_200803031615_0021                  [...]                  [...] INFO streaming.StreamJob:  map 0%  reduce 0%                  [...] INFO streaming.StreamJob:  map 43%  reduce 0%                  [...] INFO streaming.StreamJob:  map 86%  reduce 0%                  [...] INFO streaming.StreamJob:  map 100%  reduce 0%                  [...] INFO streaming.StreamJob:  map 100%  reduce 33%                  [...] INFO streaming.StreamJob:  map 100%  reduce 70%                  [...] INFO streaming.StreamJob:  map 100%  reduce 77%                  [...] INFO streaming.StreamJob:  map 100%  reduce 100%                  [...] INFO streaming.StreamJob: Job                  complete: job_200803031615_0021                  [...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output hduser@ubuntu:/usr/local/hadoop$                              

As you can see in the output above, Hadoop likewise provides a basic web interface for statistics and data. When the Hadoop cluster is running, open http://localhost:50030/ in a browser and have a look effectually. Here's a screenshot of the Hadoop web interface for the job we just ran.

Figure 1: A screenshot of Hadoop'southward JobTracker web interface, showing the details of the MapReduce job we just ran

Check if the effect is successfully stored in HDFS directory/user/hduser/gutenberg-output:

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop dfs                  -ls                  /user/hduser/gutenberg-output Found 1 items /user/hduser/gutenberg-output/part-00000     &lt;r 1&gt;                  903193  2007-09-21 13:00 hduser@ubuntu:/usr/local/hadoop$                              

Y'all can and so inspect the contents of the file with thedfs -true cat command:

                hduser@ubuntu:/usr/local/hadoop$                  bin/hadoop dfs                  -true cat                  /user/hduser/gutenberg-output/part-00000                  "(Lo)cra"                  1                  "1490   1 "1498," one "35"    ane "40,"   1 "A      ii                  "As-IS"                  .                  ii                  "A_     1 "Absoluti       1                  [...] hduser@ubuntu:/usr/local/hadoop$                              

Note that in this specific output higher up the quote signs (") enclosing the words have not been inserted by Hadoop. They are the result of how our Python code splits words, and in this example it matched the beginning of a quote in the ebook texts. Just inspect thepart-00000 file further to see it for yourself.

Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should take given you an thought of how to create your starting time MapReduce application. The focus was code simplicity and ease of agreement, specially for beginners of the Python programming language. In a real-earth awarding however, you might want to optimize your code by using Python iterators and generators (an fifty-fifty ameliorate introduction in PDF).

More often than not speaking, iterators and generators (functions that create iterators, for instance with Python'syield statement) have the advantage that an element of a sequence is not produced until y'all actually need it. This can assist a lot in terms of computational expensiveness or memory consumption depending on the chore at paw.

Note: The following Map and Reduce scripts will only work "correctly" when existence run in the Hadoop context, i.east. as Mapper and Reducer in a MapReduce job. This means that running the naive test command "cat DATA | ./mapper.py | sort -k1,one | ./reducer.py" volition not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

Precisely, we compute the sum of a discussion's occurrences, e.g. ("foo", iv), only if past chance the aforementioned word (foo) appears multiple times in succession. In the bulk of cases, however, we let the Hadoop group the (key, value) pairs between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.

mapper.py

                                  #!/usr/bin/env python                                    """A more avant-garde Mapper, using Python iterators and generators."""                  import                  sys                  def                  read_input                  (                  file                  ):                  for                  line                  in                  file                  :                  # split up the line into words                                    yield                  line                  .                  split up                  ()                  def                  primary                  (                  separator                  =                  '                  \t                  '                  ):                  # input comes from STDIN (standard input)                                    information                  =                  read_input                  (                  sys                  .                  stdin                  )                  for                  words                  in                  information                  :                  # write the results to STDOUT (standard output);                                    # what we output hither volition exist the input for the                                    # Reduce pace, i.east. the input for reducer.py                                    #                                    # tab-delimited; the footling word count is ane                                    for                  give-and-take                  in                  words                  :                  print                  '%s%s%d'                  %                  (                  word                  ,                  separator                  ,                  ane                  )                  if                  __name__                  ==                  "__main__"                  :                  principal                  ()                              

reducer.py

                                  #!/usr/bin/env python                                    """A more avant-garde Reducer, using Python iterators and generators."""                  from                  itertools                  import                  groupby                  from                  operator                  import                  itemgetter                  import                  sys                  def                  read_mapper_output                  (                  file                  ,                  separator                  =                  '                  \t                  '                  ):                  for                  line                  in                  file                  :                  yield                  line                  .                  rstrip                  ().                  separate                  (                  separator                  ,                  one                  )                  def                  main                  (                  separator                  =                  '                  \t                  '                  ):                  # input comes from STDIN (standard input)                                    data                  =                  read_mapper_output                  (                  sys                  .                  stdin                  ,                  separator                  =                  separator                  )                  # groupby groups multiple word-count pairs by word,                                    # and creates an iterator that returns sequent keys and their group:                                    #   current_word - string containing a word (the key)                                    #   group - iterator yielding all ["&lt;current_word&gt;", "&lt;count&gt;"] items                                    for                  current_word                  ,                  group                  in                  groupby                  (                  information                  ,                  itemgetter                  (                  0                  )):                  try                  :                  total_count                  =                  sum                  (                  int                  (                  count                  )                  for                  current_word                  ,                  count                  in                  group                  )                  print                  "%south%southward%d"                  %                  (                  current_word                  ,                  separator                  ,                  total_count                  )                  except                  ValueError                  :                  # count was not a number, so silently discard this detail                                    pass                  if                  __name__                  ==                  "__main__"                  :                  primary                  ()                              

From yours truly:

  • Running Hadoop On Ubuntu Linux (Single-Node Cluster)
  • Running Hadoop On Ubuntu Linux (Multi-Node Cluster)