使用 Python 编写 Hadoop MapReduce 程序


Hadoop 框架使用 Java 开发的,对 Java 进行了原生的支持,不过对于其它语言也提供了 API 支持,如 Python 、 C++ 、 Perl 、 Ruby 等。这个工具就是 Hadoop Streaming ,顾名思义, Streaming 就是 Pipe 操作,说起 pipe ,大家肯定不陌生。最原生的 Python 支持是需要 Jython 支持的,不过这里有额外的方法来实现,大家如果只是使用的话,不用纠结 Jython 转换的问题。

最容易的 Hadoop 编程模型就是 Mapper 和 Reducer 的编写,这种编程模型大大降低了我们对于并发、同步、容错、一致性的要求,你只要编写好自己的业务逻辑,就可以提交任务。然后喝杯茶,结果就出来了,前提是你的业务逻辑没有错误。

使用 Hadoop Streaming ,能够利用 Pipe 模型,而使用 Python 的巧妙之处在于处理输入输出的数据使用的是 STDIN 和 STDOUT ,然后 Hadoop Streaming 会接管一切,转化成 MapReduce 模型。

我们还是使用 wordcount 例子,具体内容不再详细解释,如果有不理解的可以自行度之。下面我们先看下 mapper 的代码:

#!/usr/bin/env python  

import sys  
#input comes from STDIN (standard input)  
for line in sys.stdin:  
    # remove leading and trailing whitespace  
    line = line.strip()  
    # split the line into words  
    words = line.split()  
    # increase counters  
    for word in words:  
        # write the results to STDOUT (standard output);  
        # what we output here will be the input for the  
        # Reduce step, i.e. the input for reducer.py  
        # tab-delimited; the trivial word count is 1  
        print '%s\t%s' % (word, 1)

简单解释一下,输入从 sys.stdin 进入,然后进行分割操作,对于每行的分割结果,打印出 word 和 count=1 , Mapper 就这么简单。 大家看完 Mapper 之后,会产生疑问,这个怎么能够实现 mapper 功能?我们跳出这个 sys.stdin 模型,再回顾下 MapReduce 的程序。在 Mapper 中,程序不关心你怎么输入,只关心你的输出,这个 Mapper 代码会被放到各个 slave 机器上,去执行 Mapper 过程,其实可以理解为过滤、处理。 在示例中,程序的输入会被进行一系列的处理过程,得到 word 和 count ,这个就是 slave 机器上的数据处理之后的内容。仔细理解下这个过程,对于开发程序还是相当有帮助的。

下面我们来看下 Reduce 程序, wordcount 的 reduce 程序就是统计相同 word 的 count 数目,然后再输出。我们还是直接上代码吧:

#!/usr/bin/env python  
from operator import itemgetter  
import sys  

current_word = None  
current_count = 0  
word = None  
# input comes from STDIN  
for line in sys.stdin:  
    # remove leading and trailing whitespace  
    line = line.strip()  
    # parse the input we got from mapper.py  
    word, count = line.split('\t', 1)  
    # convert count (currently a string) to int  
    try:  
        count = int(count)  
    except ValueError:  
        # count was not a number, so silently  
        # ignore/discard this line  
        continue  
    # this IF-switch only works because Hadoop sorts map output  
    # by key (here: word) before it is passed to the reducer  
    if current_word == word:  
        current_count += count  
    else:  
        if current_word:  
            # write result to STDOUT  
            print '%s\t%s' % (current_word, current_count)  
        current_count = count  
        current_word = word  
# do not forget to output the last word if needed!  
if current_word == word:  
    print '%s\t%s' % (current_word, current_count)

reduce的代码页不复杂,利用Reduce程序,可以得出count数目。如果当前的词和分出来的词一致的话,count相加,如果不一致的话,就打印出来,同时更新输入的word和count。最后的if是打印出最后一次统计结果。 reduce的执行依赖了MapReduce模型一个要点,在Shuffle过程中,同一个key会放到同一个reduce任务中,这样处理的是一系列连续的相同的key值,当key不一样的时候,就是说开始统计下一个word了。 下来我们看看测试结果: 首先为他们增加可执行权限:

chmod +x mapper.py
chmod +x reducer.py

输出结果如下所示: testresult 下载测试文件:

wget http://www.gutenberg.org/cache/epub/20417/pg20417.txt
wget http://www.gutenberg.org/files/5000/5000-8.txt
wget http://www.gutenberg.org/files/4300/4300-0.txt

下面就是执行Hadoop命令了,在使用Hadoop Streaming时,要使用一定的格式操作才能提交任务: 首先,我们在HDFS中创建子目录MyFirst:

hadoop fs -mkdir  MyFirst    #创建目录
hadoop fs -copyFromLocal *.txt MyFirst  #拷贝文件
hadoop fs -ls MyFirst  #验证拷贝成功

如下图: settestdata 寻找你的streaming的jar文件存放地址,注意2.8的版本放到share目录下了,可以进入hadoop安装目录寻找该文件:

cd /usr/local/hadoop/
find ./ -name "*streaming*"

然后就会找到我们的share文件夹中的hadoop-straming*.jar文件: /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.8.2.jar 接下来提交mapreduce任务:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.8.2.jar -files ./mapper.py,./reducer.py -mapper ./mapper.py -reducer ./reducer.py -input MyFirst/* -output MyFirst-output

这里提醒下:

  1. 一定要把本地的输入文件转移到hdfs系统上面,否则无法识别你的input内容;

  2. 一定要有权限,一定要在你的hdfs系统下面建立你的个人文件夹否则就会被denied;

  3. 如果你是第一次在服务器上面玩hadoop,建议在这之前请在自己的虚拟机或者linux系统上面配置好伪分布式然后入门hadoop来的比较不那么头疼,之前我并不知道我在服务器上面运维没有给我运行的权限,后来在自己的虚拟机里面运行一下example实例以及wordcount才找到自己的错误。

好啦,然后不出意外,就会complete啦,如下图: 你就可以通过如下方式查看计数结果:

hadoop fs -cat MyFirst-output/* | sort -nk 2 | tail

如下图: output