鱼喃

听!布鲁布鲁,大鱼又在那叨叨了

Apache Hadoop(2)---编程模型MapReduce

简单但是足够强大。

分而治之

对于大量数据的处理,一般有两种途径:一是增大单机的性能,但是摩尔定律总有减缓甚至失效的那天,数据量的增长远远大于单机性能的提升速度;另外一个就是采取分布式的做法,将数据划分成若干可分割的块,然后用多台服务器去并行的处理。如果原始数据之间没有依赖关系,理论上只需要单机的1/N时间,而且可以线性扩展。

如果用一句话来概括Hadoop的编程模型MapReduce,我想应该是化繁为简,分而治之。MapReduce有两种组件,Mapper和Reducer。Mapper解决的是将原始任务划分成一个一个独立的子任务,而Reducer则负责解决这些子任务并将结果拼起来,得到原始问题的结果。

WordCount

下面通过一个简单的WordCount示例程序来介绍一下如何通过MapRduce实现数据的处理。我们的输入是一些文本文件,内容是若干篇文章,要求计算出每个单词的出现次数。

首先用常规的做法去解决这个问题,需要经过如下步骤:

  1. 初始化一个计数器字典,用于存储每个单词的出现次数
  2. 对于每一个文件,从第一行开始,分割单词
  3. 对于每一个单词,将计数器里的对应值加一
  4. 迭代直到处理完所有文件

那么如果文件数量很多呢,一个个处理会很慢,可以考虑启动多个线程,每个线程处理一个文件,最后再把结果合并。这就是分治的思想了。

相比较于多线程/多进程/多节点并行,Hadoop的优势不仅仅在于将任务自动并行化了,更关键的是它屏蔽了很多并行任务可能会出现的故障,用户不需要去关注各个子任务之间的关联和子任务失败处理等等问题。

Mapper

将上面的WordCount用MapReduce实现的话,只需要一步Map和一步Reduce操作即可完成,框架会自动启动若干Mapper,而每一个Mapper所需要做的就是等待框架”喂入”数据,每次的数据对应就是文件的一行,不需要去关注文件读写,只关注核心任务即可,也就是分割句子。

Mapper所需要做的就是对于每次传入的文本,分割,然后把每一个单词传递出去即可。

1
2
3
4
5
6
7
8
9
10
11
# WCMapper.java
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String words[] = line.split(" ");
for(String word: words) {
context.write(new Text(word), new IntWritable(1));
}
}
}

有几点需要补充一下:

  1. Mapper和Reducer需要继承Hadoop框架的Mapper和Reducer类,然后重载map/reduce方法
  2. 在MapReduce中,所有的输入输出都是以KV键值对的形式,如这里的Mapper输入是(LongWritable, Text),这些都是Hadoop里的类,对应long和string,这样写主要是为了实现序列化和反序列化
  3. 对于每一个单词,输出(word, 1)键值对,表示这个单词出现了一次
  4. 最上面四个类型分别表示输入和输出的键值对类型

Reducer

MapReduce 框架会自动合并/处理Mapper的输出,经过了若干步骤之后,Reducer的输入看起来会是 (string, list<int>),即自动按照每个单词把所有的Mapper发出的键值对汇总了。

这样的话,Reducer的工作也很简单,对于每次输入,只需要便利后面的列表,然后把值相加,最后再输出(word, count)键值对即可。

1
2
3
4
5
6
7
8
9
10
11
# WCReducer.java
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value: values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}

遍历列表,然后把对应的加起来,考虑到Mapper输出的都是1,所以这里直接用size()也可以,不过为了后面的优化,写成这种。

Job Driver

写完了Mapper和Reducer,还需要一个驱动器来把它们连接起来。对于Hadoop,每个MapReduce对应一个任务,需要去定义这个任务的输入文件,输入文件格式,输出文件路径,输出文件格式,对应的Mapper和Reducer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Job job = Job.getInstance();
job.setJarByClass(WCMapreduce.class);
job.setJobName("WordCount");

job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);

首先新建一个Job实例;
然后设置入口类(也就是它本身),并设置任务的名字;
接着设置Mapper和Reducer;
再是设置输入和输出路径,可以是文件名也可以是路径,框架会自动识别。这里需要注意一个是add,一个是set,区别主要在于可以有多个输入文件/路径;
接着是Mapper和Reducer的输出键值对对应的类型,需要是Hadoop的类型;
最后是启动这个任务,可以选择等待任务完成,也可以启动之后就退出,让它后台执行,一般是提交完任务之后就退出。

将代码打包成jar,然后通过Hadoop接口执行任务即可

1
bin/hadoop jar wordcount.jar com.newnius.WCMapreduce /path/to/input /path/to/output

任务输出可以在HDFS的 /path/to/output 目录找到

1
bin/hdfs dfs -ls /path/to/output

Reducer

以上就可以完成基本的WordCount任务了,但是有没有想过,如果文本文件很多会带来两个问题,一个是Mapper和Reducer节点之间的网络通信量很大,另外一个就是Reducer接收到的列表也会很大,增加了内存开销和处理压力。

针对以上问题,MapReduce提供了Combiner机制,也就是对Mapper的输出做精简操作。在WordCount这个例子中,因为常用的单词就那么多,一篇文章中肯定会出现很多重复的单词,那么在发送到Reducer之前先把重复的单词键值对合并一下,这样Reducer端的压力就能大大减小了。

作为一个可选项,为了保证Reducer的输入是一样的,所以Combiner的输入输出需要跟Reducer保持一致,如果仔细观察就能发现,Combiner跟Reducer做的事情基本是完全一样的,所以不需要重新写一个Combiner,只需要复用Reducer,然后指定它为Combiner就可以了。

只需要在原有的代码上增加一行即可启用Combiner。可以对比一下启用了Combiner之后,网络传输和处理速度是否变快了。

1
job.setCombinerClass(WCReducer.class);

那么这么好的东西是不是每个任务都启用呢?答案是不一定,尽管可能降低网络通信开销,但是由于Combiner需要启动额外的容器和资源,也会消耗一定的时间,如果参与的节点很多而每个节点上其实没有太多能精简的东西,那么Combiner所带来的好处就很小了,反而拖慢了整体的执行速度。

所以是否启用Combiner需要取决于具体的任务和网络环境(百兆网跟万兆网肯定是不一样的)等等。

完整的代码可以在 wordcount | GitHub 找到

总结

通过分治的思想,Hadoop成功的将海量数据处理的复杂问题抽象成了Map和Reduce两种基本操作。

为了做到简单可靠,MapReduce只提供了Mapper和Reducer这两种组件(Combiner更像是一种优化设计),尽管这simple but powerful,但是过于简单的编程接口增加了用户的编码难度,一些常用的高级用法需要自己实现,另一个是很多任务可能很难用MapReduce的模式实现。