Google 的 Map/Reduce
一直听闻Google的Map/Reduce很厉害,是处理大数据的利器,单独找时间学习了解了一下,总结记录如下:
源起:Map/Reduce名称的由来
Map/Reduce的思想最早起源于函数式编程语言Lisp的两个函数,map和reduce。其实Python也有类似的这两个函数。Map是把一组数据一对一的映射为另外的一组数据,其映射的规则由一个函数来指定。Reduce是对一组数据进行归约,这个归约的规则由一个函数指定。Map是一个把数据分开的过程,Reduce则是把分开的数据合并的过程。这么说可能有点抽象,举个简单的例子:
现在有一个需求,要将一个list [1,2,3,4]每个数字x2。最简单的方法,for循环遍历,每个数字x2,python代码如下:
<code class="python">a_list=[1,2,3,4]
for i in range(len(a_list)):
a_list[i]=a_list[i]*2
当然,也有更方便的方法,用python的列表生成式[x*2 for x in a_list],这里暂且不谈。
如果利用map函数来实现这个功能,则需要这样写:
<code class="python">a_list=[1,2,3,4]
def double(x):
return x*2
print(list(map(double,a_list)))
在Python3中,map函数会返回一个迭代器,python2中,map函数返回一个list。
reduce函数和map函数差不多,如果要将一个list中的所有元素相加,可以这么写:
<code class="python">a_list=[1,2,3,4]
def add(x,y)
return x+y
reduce(add,a_list)
Google的Map/Reduce集群
这样做的目的,就是便于大型计算任务的化整为零,从而打造大型的分布式计算系统。Google将计算资源分为三类:
- Master:负责计算任务的调度,发送计算任务到Mapper
- Mapper:负责实施具体的计算任务,并把计算结果输出到Reducer
- Reducer:负责统计Maper得到的计算数据,并把数据综合汇总
再举个简单的例子,统计一篇文章的词频。为了便于表述,这篇文章只有两句话:1 、 I love Spark 2、 I love Hadoop。如何利用Map/Reduce的思想来统计这两句话的词频呢?
假设有四台主机,一台Master,两台Mapper,一台Reducer。
Master接受这篇文章后,将第一句话发给Mapper1,第二句话发给Mapper2.
Mapper1和Mapper2便可以同时开始任务,统计词频。Mapper1得到了如下的结果:
<code class="md">I 1
love 1
Spark 1
Mapper2得到了如下的结果:
<code class="md">I 1
love 1
Hadoop 1
Mapper1和Mapper2将结果发送给Reducer,Reducer处理之后得到了这边文章的词频:
<code class="md">I 2
love 2
Hadoop 1
Spark 1
对于这两句话来说,在单机上性能似乎绰绰有余,但是一旦数据量变大,比如,需要对1T的文本数据进行词频统计时,单机的性能远远不够用,通过Map/Reduce后,多台Mapper并行工作,能够大大的提高计算的速度。下面这张图片更加详细的展示出了Map/Reduce的流程。
Spark和Hadoop
虽然上面的思想似乎很简单,但是真正的实施起来,还是比较麻烦的,比如,主机之间如何进行通信?任务怎样进行精确的划分?计算过程中出现错误怎么办?而Spark和Hadoop就是为了解决这一问题的。它们把并发、分布式(如机器间通信)和故障恢复等计算细节隐藏起来。
它们提供的高级 API 剥离了对集群本身的关注,应用开发者可以专注于应用所要做的计算本身,从而更加简单的构建出分布式计算系统。
MR编程
数据格式
IMDB电影评价数据集的文件压缩包中,提供了一些原始数据,这些数据的格式如下:
ratings.dat:
UserID::MovieID::Rating::Timestamp
users.dat:
UserID::Gender::Age::Occupation::Zip-code
movies.dat:
MovieID::Title::Genres
任务描述
问题 1:基于 ratings.dat 文件,找出至少评价了 30 部电影的用户,输出用户 ID;
问题 2:列出所有年龄为 35 岁以下的男性用户的用户 ID;
问题 3:给定一些电影名称,找出这些电影的类型。
编写代码
由于Hadoop由Java编写而成,所以它对Java编写的MR程序支持也比较好,这次实验,使用了Hadoop软件提供的Java软件包编写程序。
Hadoop提供拔插式的程序编写框架,需要分别编写MapperClass、ReducerClass以及其他函数,并将其绑定在MR
job上。
所以,针对这四个任务,分别编写Reducer、Mapper函数,然后再Java的主函数中,绑定这些函数到一个MR
job上,如下面代码所示:
<code class="java"><br></br>job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
编译及运行MR程序
由于编辑及调试程序在Windows上使用Intellij
Idea比较方便,所以需要在Idea软件中,导入Hadoop的jar包,然后编译代码为jar包,上传到hadoop-master上。
随后使用以下的命令上传指定的文件到hdfs之中:
hadoop fs -put /\*.dat /user/input
运行MR程序:
<br></br>hadoop jar task1.jar Task1 /user/input /user/output
附录
三个任务的代码如下:
问题一
<code class="java">import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Task1 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("::");
context.write(new Text( words[0]), one);
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value:values){
sum += value.get();
}
if (sum>=30) {
context.write(key, new IntWritable(sum));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
问题二
<code class="java">import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Task2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("::");
if ( Integer.parseInt(words[2])< 35 && words[1].equals("M"))
context.write(new Text( words[0]), one);
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
context.write(key, new IntWritable(1));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
问题三
<code class="java">import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Task3 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] aims ={"Last Summer in the Hamptons","Two Bits","Anne Frank Remembered"};
String[] words = line.split("::");
Boolean flag = false;
for (String aim:aims){
if(words[1].contains(aim)){
flag = true;
}
}
if ( flag) {
context.write(new Text(words[1]), new Text(words[2]));
}
}
}
public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
for (Text value:values){
context.write(key, value);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}