Google 的 Map/Reduce

Google 的 Map/Reduce

一直听闻Google的Map/Reduce很厉害,是处理大数据的利器,单独找时间学习了解了一下,总结记录如下:

源起:Map/Reduce名称的由来

Map/Reduce的思想最早起源于函数式编程语言Lisp的两个函数,map和reduce。其实Python也有类似的这两个函数。Map是把一组数据一对一的映射为另外的一组数据,其映射的规则由一个函数来指定。Reduce是对一组数据进行归约,这个归约的规则由一个函数指定。Map是一个把数据分开的过程,Reduce则是把分开的数据合并的过程。这么说可能有点抽象,举个简单的例子:

现在有一个需求,要将一个list [1,2,3,4]每个数字x2。最简单的方法,for循环遍历,每个数字x2,python代码如下:

<code class="python">a_list=[1,2,3,4]
for i in range(len(a_list)):
    a_list[i]=a_list[i]*2

当然,也有更方便的方法,用python的列表生成式[x*2 for x in a_list],这里暂且不谈。

如果利用map函数来实现这个功能,则需要这样写:

<code class="python">a_list=[1,2,3,4]

def double(x):
    return x*2
print(list(map(double,a_list)))

在Python3中,map函数会返回一个迭代器,python2中,map函数返回一个list。
reduce函数和map函数差不多,如果要将一个list中的所有元素相加,可以这么写:

<code class="python">a_list=[1,2,3,4]
def add(x,y)
    return x+y
reduce(add,a_list)

Google的Map/Reduce集群

这样做的目的,就是便于大型计算任务的化整为零,从而打造大型的分布式计算系统。Google将计算资源分为三类:

  • Master:负责计算任务的调度,发送计算任务到Mapper
  • Mapper:负责实施具体的计算任务,并把计算结果输出到Reducer
  • Reducer:负责统计Maper得到的计算数据,并把数据综合汇总

再举个简单的例子,统计一篇文章的词频。为了便于表述,这篇文章只有两句话:1 、 I love Spark 2、 I love Hadoop。如何利用Map/Reduce的思想来统计这两句话的词频呢?
假设有四台主机,一台Master,两台Mapper,一台Reducer。

Master接受这篇文章后,将第一句话发给Mapper1,第二句话发给Mapper2.

Mapper1和Mapper2便可以同时开始任务,统计词频。Mapper1得到了如下的结果:

<code class="md">I         1
love    1
Spark  1

Mapper2得到了如下的结果:

<code class="md">I         1
love    1
Hadoop  1

Mapper1和Mapper2将结果发送给Reducer,Reducer处理之后得到了这边文章的词频:

<code class="md">I         2
love    2
Hadoop  1
Spark     1

对于这两句话来说,在单机上性能似乎绰绰有余,但是一旦数据量变大,比如,需要对1T的文本数据进行词频统计时,单机的性能远远不够用,通过Map/Reduce后,多台Mapper并行工作,能够大大的提高计算的速度。下面这张图片更加详细的展示出了Map/Reduce的流程。

Spark和Hadoop

虽然上面的思想似乎很简单,但是真正的实施起来,还是比较麻烦的,比如,主机之间如何进行通信?任务怎样进行精确的划分?计算过程中出现错误怎么办?而Spark和Hadoop就是为了解决这一问题的。它们把并发、分布式(如机器间通信)和故障恢复等计算细节隐藏起来。
它们提供的高级 API 剥离了对集群本身的关注,应用开发者可以专注于应用所要做的计算本身,从而更加简单的构建出分布式计算系统。

MR编程

数据格式

IMDB电影评价数据集的文件压缩包中,提供了一些原始数据,这些数据的格式如下:

ratings.dat:

UserID::MovieID::Rating::Timestamp

users.dat:

UserID::Gender::Age::Occupation::Zip-code

movies.dat:

MovieID::Title::Genres

任务描述

问题 1:基于 ratings.dat 文件,找出至少评价了 30 部电影的用户,输出用户 ID;

问题 2:列出所有年龄为 35 岁以下的男性用户的用户 ID;

问题 3:给定一些电影名称,找出这些电影的类型。

编写代码

由于Hadoop由Java编写而成,所以它对Java编写的MR程序支持也比较好,这次实验,使用了Hadoop软件提供的Java软件包编写程序。

Hadoop提供拔插式的程序编写框架,需要分别编写MapperClass、ReducerClass以及其他函数,并将其绑定在MR
job上。

所以,针对这四个任务,分别编写Reducer、Mapper函数,然后再Java的主函数中,绑定这些函数到一个MR
job上,如下面代码所示:

<code class="java"><br></br>job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

编译及运行MR程序

由于编辑及调试程序在Windows上使用Intellij
Idea比较方便,所以需要在Idea软件中,导入Hadoop的jar包,然后编译代码为jar包,上传到hadoop-master上。

随后使用以下的命令上传指定的文件到hdfs之中:

hadoop fs -put /\*.dat /user/input

运行MR程序:

<br></br>hadoop jar task1.jar Task1 /user/input /user/output

附录

三个任务的代码如下:

问题一

<code class="java">import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task1 {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words =  line.split("::");
            context.write(new Text( words[0]), one);
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value:values){
                sum += value.get();
            }
            if (sum>=30) {
                context.write(key, new IntWritable(sum));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

问题二

<code class="java">import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task2 {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words =  line.split("::");
            if ( Integer.parseInt(words[2])< 35 && words[1].equals("M"))
            context.write(new Text( words[0]), one);
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {


                context.write(key, new IntWritable(1));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

问题三

<code class="java">import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Task3 {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, Text>{

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] aims ={"Last Summer in the Hamptons","Two Bits","Anne Frank Remembered"};
            String[] words =  line.split("::");
            Boolean flag = false;
            for (String aim:aims){
                if(words[1].contains(aim)){
                    flag = true;
                }
            }
            if ( flag) {
                context.write(new Text(words[1]), new Text(words[2]));
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,Text,Text,Text> {

        public void reduce(Text key, Iterable<Text> values,
                           Context context
        ) throws IOException, InterruptedException {
            for (Text value:values){
                context.write(key, value);
            }

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}