Hadoop Pipes C++实现MR
Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和reduce 代码之间的 Streaming。Pipes 使用套接字 socket 作为 tasktracker 与 C++版本函数的进程间的通讯,与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。
1、Pipes架构设计
Hadoop pipes允许用户用C++编写五个基本组件:mapper,reducer,partitioner,combiner,recordReader,这五个组件可以是Java编写的,也可以是C++编写的。
2、Mapper编写
#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"
class UBCMapper : public HadoopPipes::Mapper {
public:
UBCMapper(HadoopPipes::TaskContext &context) {
}
void map(HadoopPipes::MapContext &context) {
// 获取input中的一行数据
std::string line = context.getInputValue();
std::vector<std::string> split = HadoopUtils::splitString(line, "\t");
if (split.size() == 8) {
// 转化成新的keys-value
context.emit(split[3], split[2]);
}
}
};
HadoopUtils提供了丰富的工具函数,包括字符串分割、字符串数值互转等。这里需要注意的是,字符串数值互转只支持32位长度,如果数值字符串长度较长,使用HadoopUtils转换会溢出,推荐使用boost将字符转为int64_t:
// string=>uint64_t
boost::lexical_cast<uint64_t>(str)
// uint64_t=>string
std::stringstream ss;
ss << count;
std::cout<<ss.str();
3、Reducer编写
class UBCReducer : public HadoopPipes::Reducer {
public:
UBCReducer(HadoopPipes::TaskContext &context) {
}
void reduce(HadoopPipes::ReduceContext &context) {
std::string maxStr = "";
while (context.nextValue()) {
std::vector<std::string> split = HadoopUtils::splitString(context.getInputValue(), "_");
if(split[1] > maxStr){
maxStr = split[1];
}
}
context.emit(context.getInputKey(), maxStr);
}
};
shuffle排序之后的结果输出到Reducer,context.nextValue()
循环中的遍历为同一个key的结果,可以通过context.getInputValue()
获取,最后emit Reducer的结果。
4、编译执行
(1) 编写Makefile文件:
CC = g++
CPPINC = -m64 -I/xxx/hadoop-2.7.4/include
CPPLIB = -L/xxx/hadoop-2.7.4/lib/native
UBC_MR: main.cpp
$(CC) $(CPPINC) $< -Wall $(CPPLIB) -lhadooppipes -lcrypto -lhadooputils -lpthread -g -O2 -o $@
接下来执行make
命令。
(2) 提交任务
hdfs dfs -put ./UBC_MR /bin/
mapred pipes -D hadoop.pipes.java.recordreader=true -D hadoop.pipes.java.recordwriter=true -program /bin/UBC_MR -input /test.data -output /testoutput
5、遇到的问题
(1) boost not decleared
若提示缺少boost,则在CPPINC -I boost include文件目录。
(2) java null exception
Hadoop Pipes不能运行在standalone形式下,单机启动yarn搭建伪分布式解决。
(3) beyond memory limits
In mapred-site.xml:
<property>
<name>mapreduce.map.memory.mb</name>
<value>4096</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>8192</value>
</property>
评论已关闭