标签 mr 下的文章

Hadoop streaming是和hadoop一起发布的实用程序,它允许用户创建和执行使用任何程序或者脚本编写的map或者reduce的mapreducejobs。如果你不熟悉Java或者C++,你可以使用Python/shell/PHP标准输入输出流非常方便的创建Hadoop任务。

1、Streaming原理
Hadoop Streaming 使用了 Unix 的标准输入输出作为 Hadoop 和其他编程语言的开发接口,因此在其他的编程语言所写的程序中,只需要将标准输入作为程序的输入,将标准输出作为程序的输出就可以了。mapper和 reducer都是能够从stdin逐行(line by line)读取输入的可执行文件,然后把处理完的结果发送到stdout。这个实用工具将会创建 一个Map/Reduce作业,并将作业提交到适当的集群,监控作业的运行进度直到作业运行完成。用法如下:

Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
(8)-D:作业的一些属性(以前用的是-jonconf)

2、Python Mapper
相必大家对Hadoop原理已经了解了,下面以Python为例编写streaming MR任务。Mapper主要功能是解析原始数据,输出预期的Key=>value,在Python里面直接print即可。

import json
import sys

# 从标准输入中读数据
for line in sys.stdin:
    if not line:
        continue
    val = line.strip()
    if val:
        testlist= val.split("\t")
        if len(testlist) == 3:
            detail = {
                'v1': testlist[0],
                'v2': int(testlist[1])
            }
            # 标准输出数据
            print "%s\t%s" % (testlist[0], json.dumps(detail))

- 阅读剩余部分 -

Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和reduce 代码之间的 Streaming。Pipes 使用套接字 socket 作为 tasktracker 与 C++版本函数的进程间的通讯,与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。

1、Pipes架构设计
Hadoop-pipes-arch.jpg

Hadoop pipes允许用户用C++编写五个基本组件:mapper,reducer,partitioner,combiner,recordReader,这五个组件可以是Java编写的,也可以是C++编写的。

2、Mapper编写

#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"

class UBCMapper : public HadoopPipes::Mapper {
public:
    UBCMapper(HadoopPipes::TaskContext &context) {
    }
    void map(HadoopPipes::MapContext &context) {
        // 获取input中的一行数据
        std::string line = context.getInputValue();
        std::vector<std::string> split = HadoopUtils::splitString(line, "\t");
        if (split.size() == 8) {
            // 转化成新的keys-value
            context.emit(split[3], split[2]);
        }
    }
};

HadoopUtils提供了丰富的工具函数,包括字符串分割、字符串数值互转等。这里需要注意的是,字符串数值互转只支持32位长度,如果数值字符串长度较长,使用HadoopUtils转换会溢出,推荐使用boost将字符转为int64_t:

// string=>uint64_t
boost::lexical_cast<uint64_t>(str)
// uint64_t=>string
 std::stringstream ss;
ss << count;
std::cout<<ss.str();

- 阅读剩余部分 -