Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和reduce 代码之间的 Streaming。Pipes 使用套接字 socket 作为 tasktracker 与 C++版本函数的进程间的通讯,与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。

1、Pipes架构设计
Hadoop-pipes-arch.jpg

Hadoop pipes允许用户用C++编写五个基本组件:mapper,reducer,partitioner,combiner,recordReader,这五个组件可以是Java编写的,也可以是C++编写的。

2、Mapper编写

#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"

class UBCMapper : public HadoopPipes::Mapper {
public:
    UBCMapper(HadoopPipes::TaskContext &context) {
    }
    void map(HadoopPipes::MapContext &context) {
        // 获取input中的一行数据
        std::string line = context.getInputValue();
        std::vector<std::string> split = HadoopUtils::splitString(line, "\t");
        if (split.size() == 8) {
            // 转化成新的keys-value
            context.emit(split[3], split[2]);
        }
    }
};

HadoopUtils提供了丰富的工具函数,包括字符串分割、字符串数值互转等。这里需要注意的是,字符串数值互转只支持32位长度,如果数值字符串长度较长,使用HadoopUtils转换会溢出,推荐使用boost将字符转为int64_t:

// string=>uint64_t
boost::lexical_cast<uint64_t>(str)
// uint64_t=>string
 std::stringstream ss;
ss << count;
std::cout<<ss.str();

3、Reducer编写

class UBCReducer : public HadoopPipes::Reducer {
public:
    UBCReducer(HadoopPipes::TaskContext &context) {
    }
    void reduce(HadoopPipes::ReduceContext &context) {
        std::string maxStr = "";
        while (context.nextValue()) {
            std::vector<std::string> split = HadoopUtils::splitString(context.getInputValue(), "_");
            if(split[1] > maxStr){
                maxStr = split[1];
            }
        }
        context.emit(context.getInputKey(), maxStr);
    }
};

shuffle排序之后的结果输出到Reducer,context.nextValue()循环中的遍历为同一个key的结果,可以通过context.getInputValue()获取,最后emit Reducer的结果。

4、编译执行
(1) 编写Makefile文件:

CC = g++  
CPPINC = -m64 -I/xxx/hadoop-2.7.4/include
CPPLIB = -L/xxx/hadoop-2.7.4/lib/native
UBC_MR: main.cpp
    $(CC) $(CPPINC) $< -Wall $(CPPLIB) -lhadooppipes -lcrypto -lhadooputils -lpthread -g -O2 -o $@

接下来执行make命令。
(2) 提交任务

hdfs dfs -put ./UBC_MR /bin/
mapred pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -program /bin/UBC_MR -input /test.data -output /testoutput

5、遇到的问题
(1) boost not decleared
若提示缺少boost,则在CPPINC -I boost include文件目录。
(2) java null exception
Hadoop Pipes不能运行在standalone形式下,单机启动yarn搭建伪分布式解决。
(3) beyond memory limits
In mapred-site.xml:

<property>
    <name>mapreduce.map.memory.mb</name>
    <value>4096</value>
</property>
<property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>8192</value>
</property>

标签: C++, Hadoop, Pipes, mr

评论已关闭