Hadoop Streaming实现MR
Hadoop streaming是和hadoop一起发布的实用程序,它允许用户创建和执行使用任何程序或者脚本编写的map或者reduce的mapreducejobs。如果你不熟悉Java或者C++,你可以使用Python/shell/PHP标准输入输出流非常方便的创建Hadoop任务。
1、Streaming原理
Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。mapper和 reducer都是能够从stdin逐行(line by line)读取输入的可执行文件,然后把处理完的结果发送到stdout。这个实用工具将会创建 一个Map/Reduce作业,并将作业提交到适当的集群,监控作业的运行进度直到作业运行完成。用法如下:
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(8)-D:作业的一些属性(以前用的是-jonconf)
2、Python Mapper
相必大家对Hadoop原理已经了解了,下面以Python为例编写streaming MR任务。Mapper主要功能是解析原始数据,输出预期的Key=>value
,在Python里面直接print
即可。
import json
import sys
# 从标准输入中读数据
for line in sys.stdin:
if not line:
continue
val = line.strip()
if val:
testlist= val.split("\t")
if len(testlist) == 3:
detail = {
'v1': testlist[0],
'v2': int(testlist[1])
}
# 标准输出数据
print "%s\t%s" % (testlist[0], json.dumps(detail))
3、Python Reducer
Mapper之后的结果作为Shuffle过程的输入,Shuffle对Keys排序,结果输出给Reducer过程。
import json
import sys
last_key = None
last_path = []
for line in sys.stdin:
if not line:
continue
line = line.strip()
if not line:
continue
kv = line.split("\t")
if len(kv) > 1:
key = kv[0]
val = kv[1]
if last_key and last_key != key:
last_path = sorted(last_path, key=lambda k: k['v2'])
print "%s\t%s" % (last_key, json.dumps(last_path))
last_path = []
last_key = key
last_path.append(json.loads(val))
else:
last_path.append(json.loads(val))
last_key = key
if last_key:
if len(last_path):
last_path = sorted(last_path, key=lambda k: k['v2'])
print "%s\t%s" % (last_key, json.dumps(last_path))
4、本地验证&提交
使用Hadoop streaming的好处是,可以本地终端模拟hadoop过程,方便的对mapper/reducer过程调试验证:
cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>
提交任务:
hadoop jar /xxx//hadoop-streaming-*4.jar -mapper mapper.py -file /xxx/mapper.py -reducer reducer.py -file /xxx/reducer.py -input /input/input.data -output /output
-file:传入mapper/reducer脚本的文件路径
-mapper/reducer:传入mapper/reducer脚本的文件名
评论已关闭