Hadoop streaming是和hadoop一起发布的实用程序,它允许用户创建和执行使用任何程序或者脚本编写的map或者reduce的mapreducejobs。如果你不熟悉Java或者C++,你可以使用Python/shell/PHP标准输入输出流非常方便的创建Hadoop任务。

1、Streaming原理
Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。mapper和 reducer都是能够从stdin逐行(line by line)读取输入的可执行文件,然后把处理完的结果发送到stdout。这个实用工具将会创建 一个Map/Reduce作业,并将作业提交到适当的集群,监控作业的运行进度直到作业运行完成。用法如下:

Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(8)-D:作业的一些属性(以前用的是-jonconf)

2、Python Mapper
相必大家对Hadoop原理已经了解了,下面以Python为例编写streaming MR任务。Mapper主要功能是解析原始数据,输出预期的Key=>value,在Python里面直接print即可。

import json
import sys

# 从标准输入中读数据
for line in sys.stdin:
    if not line:
        continue
    val = line.strip()
    if val:
        testlist= val.split("\t")
        if len(testlist) == 3:
            detail = {
                'v1': testlist[0],
                'v2': int(testlist[1])
            }
            # 标准输出数据
            print "%s\t%s" % (testlist[0], json.dumps(detail))

3、Python Reducer
Mapper之后的结果作为Shuffle过程的输入,Shuffle对Keys排序,结果输出给Reducer过程。

import json
import sys
last_key = None
last_path = []

for line in sys.stdin:
    if not line:
        continue
    line = line.strip()
    if not line:
        continue
    kv = line.split("\t")
    if len(kv) > 1:
        key = kv[0]
        val = kv[1]
        if last_key and last_key != key:
            last_path = sorted(last_path, key=lambda k: k['v2'])
            print "%s\t%s" % (last_key, json.dumps(last_path))
            last_path = []
            last_key = key
            last_path.append(json.loads(val))
        else:
            last_path.append(json.loads(val))
            last_key = key
if last_key:
    if len(last_path):
        last_path = sorted(last_path, key=lambda k: k['v2'])
        print "%s\t%s" % (last_key, json.dumps(last_path))

4、本地验证&提交
使用Hadoop streaming的好处是,可以本地终端模拟hadoop过程,方便的对mapper/reducer过程调试验证:

cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

提交任务:

hadoop jar /xxx//hadoop-streaming-*4.jar -mapper mapper.py -file /xxx/mapper.py -reducer reducer.py -file /xxx/reducer.py -input /input/input.data -output /output

-file:传入mapper/reducer脚本的文件路径
-mapper/reducer:传入mapper/reducer脚本的文件名

标签: Hadoop, mr, Streaming

评论已关闭