标签 Pipes 下的文章

Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和reduce 代码之间的 Streaming。Pipes 使用套接字 socket 作为 tasktracker 与 C++版本函数的进程间的通讯,与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。

1、Pipes架构设计
Hadoop-pipes-arch.jpg

Hadoop pipes允许用户用C++编写五个基本组件:mapper,reducer,partitioner,combiner,recordReader,这五个组件可以是Java编写的,也可以是C++编写的。

2、Mapper编写

#include "Pipes.hh"
#include "TemplateFactory.hh"
#include "StringUtils.hh"

class UBCMapper : public HadoopPipes::Mapper {
public:
    UBCMapper(HadoopPipes::TaskContext &context) {
    }
    void map(HadoopPipes::MapContext &context) {
        // 获取input中的一行数据
        std::string line = context.getInputValue();
        std::vector<std::string> split = HadoopUtils::splitString(line, "\t");
        if (split.size() == 8) {
            // 转化成新的keys-value
            context.emit(split[3], split[2]);
        }
    }
};

HadoopUtils提供了丰富的工具函数,包括字符串分割、字符串数值互转等。这里需要注意的是,字符串数值互转只支持32位长度,如果数值字符串长度较长,使用HadoopUtils转换会溢出,推荐使用boost将字符转为int64_t:

// string=>uint64_t
boost::lexical_cast<uint64_t>(str)
// uint64_t=>string
 std::stringstream ss;
ss << count;
std::cout<<ss.str();

- 阅读剩余部分 -