map_reduce
Table of Contents

分布式计算

Hadoop MapReduce的出现,使得大数据计算通用编程成为可能。我们只要遵循MapReduce编程模型编写业务处理逻辑代码,就可以运行在Hadoop分布式集群上,无需关心分布式计算是如何完成的。也就是说,我们只需要关心业务逻辑,不用关心系统调用与运行环境,这和我们目前的主流开发方式是一致的。

Map Reduce 模型

MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行

该编程模型只包含Map和Reduce两个过程,map的主要输入是一对值,经过map计算后输出一对值;然后将相同Key合并,形成;再将这个输入reduce,经过计算输出零个或多个对。

同时,MapReduce又是非常强大的,不管是关系代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程来实现

处理过程

WordCount主要解决的是文本处理中词频统计的问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十KB到几MB的数据,只需要写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了。

img

# 文本前期处理
strl_ist = str.replace('\n', '').lower().split(' ')
count_dict = {}
# 如果字典里有该单词则加1,否则添加入字典
for str in strl_ist:
if str in count_dict.keys():
    count_dict[str] = count_dict[str] + 1
    else:
        count_dict[str] = 1

简单说来,就是建一个Hash表,然后将字符串里的每个词放到这个Hash表里。如果这个词第一次放到Hash表,就新建一个Key、Value对,Key是这个词,Value是1。如果Hash表里已经有这个词了,那么就给这个词的Value + 1。

小数据量用单机统计词频很简单,但是如果想统计全世界互联网所有网页(数万亿计)的词频数(而这正是Google这样的搜索引擎的典型需求),不可能写一个程序把全世界的网页都读入内存,这时候就需要用MapReduce编程来解决

WordCount的MapReduce程序如下

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
}

map函数的输入主要是一个对,在这个例子里,Value是要统计的所有文本中的一行数据,Key在一般计算中都不会用到。

public void map(Object key, Text value, Context context
                    )

map函数的计算过程是,将这行文本中的单词提取出来,针对每个单词输出一个这样的对。

MapReduce计算框架会将这些收集起来,将相同的word放在一起,形成>这样的数据,然后将其输入给reduce函数。

public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) 

这里reduce的输入参数Values就是由很多个1组成的集合,而Key就是具体的单词word。

reduce函数的计算过程是,将这个集合里的1求和,再将单词(word)和这个和(sum)组成一个,也就是输出。每一个输出就是一个单词和它的词频统计总和。

一个map函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是HDFS所做的),MapReduce计算框架为每个数据块分配一个map函数去计算,从而实现大数据的分布式计算。

作业启动

img

以Hadoop 1为例,MapReduce运行过程涉及三类关键进程

大数据应用进程

这类进程是启动MapReduce程序的主入口,主要是指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群,也就是下面提到的JobTracker进程。这是由用户启动的MapReduce程序进程,比如我们上期提到的WordCount程序。

JobTracker进程

作业的管理者,这类进程根据要处理的输入数据量,命令下面提到的TaskTracker进程启动相应数量的Map和Reduce进程任务,并管理整个作业生命周期的任务调度和监控(心跳信息)

这是Hadoop集群的常驻进程,需要注意的是,JobTracker进程在整个Hadoop集群全局唯一。

在Hadoop 1.X中,是一个JobTracker 带多个TaskTracker

TaskTracker进程

这个进程负责启动和管理Map进程以及Reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器。也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTracker进程。

JobTracker进程和TaskTracker进程是主从关系,主服务器通常只有一台(或者另有一台备机提供高可用服务,但运行时只有一台服务器对外提供服务,真正起作用的只有一台),从服务器可能有几百上千台,所有的从服务器听从主服务器的控制和调度安排。主服务器负责为应用程序分配服务器资源以及作业执行的调度,而具体的计算操作则在从服务器上完成。

运行机制

MapReduce 1.X

image-20200112210713957

1.应用进程JobClient将用户作业JAR包存储在HDFS中,将来这些JAR包会分发给Hadoop集群中的服务器执行MapReduce计算。

2.应用程序提交job作业给JobTracker。

3.JobTracker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树。

4.JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的Reduce数目创建相应数量的TaskInProcess。

5.TaskTracker进程和JobTracker进程进行定时通信。

6.如果TaskTracker有空闲的计算资源(有空闲CPU核心),JobTracker就会给它分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据,以实现我们一开始就提到的“移动计算比移动数据更划算”。

7.TaskTracker收到任务后根据任务类型(是Map还是Reduce)和任务参数(作业JAR包路径、输入数据文件路径、要处理的数据在文件中的起始位置和偏移量、数据块多个备份的DataNode主机名等),启动相应的Map或者Reduce进程。

8.Map或者Reduce进程启动后,检查本地是否有要执行任务的JAR包文件,如果没有,就去HDFS上下载,然后加载Map或者Reduce代码开始执行。

9.如果是Map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机);如果是Reduce进程,将结果数据写出到HDFS。

做的仅仅是编写一个map函数和一个reduce函数就可以了,根本不用关心这两个函数是如何被分布启动到集群上的,也不用关心数据块又是如何分配给计算任务的。这一切都由MapReduce计算框架完成

MapReduce 2.X

image-20200112210652388

shuffle

在map输出与reduce输入之间,MapReduce计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫shuffle

分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是shuffle

image-20200112210743247

每个Map任务的计算结果都会写入到本地文件系统,等Map任务快要计算完成的时候,MapReduce计算框架会启动shuffle过程,在Map任务进程调用一个Partitioner接口,对Map产生的每个进行Reduce分区选择,然后通过HTTP通信发送给对应的Reduce进程。这样不管Map位于哪个服务器节点,相同的Key一定会被发送给相同的Reduce进程。Reduce任务进程对收到的进行排序和合并,相同的Key放在一起,组成一个传递给Reduce执行。

map输出的shuffle到哪个Reduce进程是这里的关键,它是由Partitioner来实现,MapReduce框架默认的Partitioner用Key的哈希值对Reduce任务数量取模,相同的Key一定会落在相同的Reduce任务ID上。

从实现上来看的话,这样的Partitioner代码只需要一行。

 /** Use {@link Object#hashCode()} to partition. */ 
public int getPartition(K2 key, V2 value, int numReduceTasks) { 
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 
 }

不管是MapReduce还是Spark,只要是大数据批处理计算,一定都会有shuffle过程,只有让数据关联起来

shuffle也是整个MapReduce过程中最难、最消耗性能的地方,在MapReduce早期代码中,一半代码都是关于shuffle处理的。

多节点处理

image-20200112210806559

核心概念

image-20200112210828853

Split 最小计算单元

交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元

默认情况下,其和HDFS中的blocksize( HDFS中最小的存储单元,128M) 是一一对应的,不建议手动设置其之间的关系