hadoop理论课-第五章Mapreduce


Mapreduce

MapReduce概述

MapReduce是什么?

Mapreduce是一种简化并行计算的编程模型,用于进行大数据量的计算

GjRro8.md.png

MapReduce设计思想

GjRDdf.md.png

MapReduce特点

易于编程

良好的扩展性

高容错性

擅长对PB级以上海量数据进行离线处理

MapReduce不擅长的场景

实时计算

MapReduce无法像MySQL一样,在毫秒或秒级内返回结果

流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,动态变化

DAG(有向图)计算

MapReduce编程模型

初识MapReduce模型

GjhcWD.png

GjhLlQ.png

Gj4f9U.png

MR原语: 相同的key为一组,调用一次reduce方法,迭代计算这一组数据

MapReduce工作流程

Gj5Vgg.png

MapReduce模型要点

Gj5cKH.png

案例

案例一

Gj7nzT.png

Gj72Sf.png

GjHgE9.png

Gjq4mD.png

GjvqL6.png

GjxUp9.png

案例二

Gjzi9J.png
GjzCh4.md.png

GvSbyn.md.png
GvSqLq.md.png

案例三 (序列化)

序列化和反序列化

Gvpr7V.md.png
GvpykT.md.png

Gv9oKs.md.png
Gv9Trn.md.png

GvCtMj.png

Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package serializable.mapreduce; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable,
Employee> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出:k2 部门号 v2 员工对象
context.write(new IntWritable(e.getDeptno()), //员工的部门号
e); //员工对象
}
}
Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package serializable.mapreduce; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable,
IntWritable> {
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
//取出 v3 中的每个员工数据,进行工资求和
int total = 0;
for(Employee e:v3){
total = total + e.getSal();
}
//输出
context.write(k3, new IntWritable(total));
}
}
main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package serializable.mapreduce; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
// 创建一个 job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//指定 job 的 mapper 和输出的类型 k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定 job 的 reducer 和输出的类型 k4 v4
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定 job 的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

MapReduce进阶

mapreduce的输入格式

物理分片

分割会严格按照设定大小128m或字数分割,可能会造成不符合逻辑的分割

逻辑分片

1
2
3
WordCount 程序的 main()方法中,可以通过下面的语句指定输入格式,也可以不
指定。如果不指定,默认就是 TextInputFormat。
job.setInputFormatClass(TextInputFormat.class);
inputFormat提供一下两个功能:
  • 数据切分,获得SplitInput(逻辑切片) ,FileInputFormat.getSplits()获取到。
  • 为Mapper提供输入数据

有多少个SplitInput,就有多少个Mapper
TextInputFormat 是默认InputFormat

JgmCzn.png

getSplits 方法负责将一个大数据逻辑分成许多片,但每个分片只是一个逻辑上的定义,仅是提供了一个如何将数据分片的方法,并没有物理上的独立存储

createRecordReader 方法返回一个 RecordReader 对象,实现了类似的迭代器功能,将某个InputSplit 解析成一个个 key/value 对

定位记录边界

为了能识别一条完整的记录,应该添加一些同步标示,如 TextInputFormat 的标示是换行符

InputFormat 接口实现类

JgnVXt.png
1
这里介绍一下 FileInputFormat。FileInputFormat 是所有文件作为其数据源的 InputFormat实现的基类,它的主要作用是指出作业的输入文件位置。因为作业的输入被设定为一组路径,这对指定作业输入提供了很强的灵活性。FileInputFormat 提供了如下 4 种静态方法来设定作业的输入路径。
1
2
3
4
5
6
7
8
这些方法都是 `FileInputFormat` 类中用来设置 MapReduce 作业输入数据路径的静态方法。它们的区别在于接受的参数类型和数量不同。

- `addInputPath(Job job, Path path)`:此方法用于向作业添加单个输入路径。`path` 参数是一个 `Path` 对象,表示要添加的输入路径。
- `addInputPaths(Job job, String commaSeparatedPaths)`:此方法用于向作业添加多个输入路径。`commaSeparatedPaths` 参数是一个字符串,其中包含多个以逗号分隔的输入路径。
- `setInputPaths(Job job, String commaSeparatedPaths)`:此方法用于设置作业的输入路径。`commaSeparatedPaths` 参数是一个字符串,其中包含多个以逗号分隔的输入路径。与 `addInputPaths` 方法不同,此方法会覆盖作业之前设置的所有输入路径。
- `setInputPaths(Job job, Path... inputPaths)`:此方法用于设置作业的输入路径。`inputPaths` 参数是一个可变长度的 `Path` 对象数组,表示要设置的输入路径。与 `setInputPaths(Job job, String commaSeparatedPaths)` 方法不同,此方法接受的是 `Path` 对象而不是字符串。

你可以根据实际需要选择使用哪种方法来设置 MapReduce 作业的输入路径。

mapreduce的输出格式

outputFormat接口

OutputFormat主要用于描述输出数据的格式,通过RecordWriter能够将用户提供的key/value对写入特定格式的文件中

(1)TextOutputFormt调用toString()方法把它们转换为字符串
(2)NullWritable来省略输出的key或value

outputFormat接口实现类

JgnRAO.png

排序排序是针对map输出里面的key,没会对value排序;map输出和reduce输入都有排序;作用是提高效率

分区Partition

Partition定义:

Mapper任务划分数据的过程称作Partition。
负责实现数据的类称作Partitioner,默认的分区是Hash分区 (Hash Partition)

Partition作用:

将map阶段产生的所有<key,value>对分配给不同的Reducer 处理,可以将Reduce阶段的处理负载进行分摊

Partition的数量决定Reducer的数量

Hash分区基本原理

计算某个值的hash值,如果结果相同,则放入同一个分区

Hash分区的作用:把数据打散进行存放,最终是为了避免热块

实例

Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package part; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2 部门号 v2 员工
public class PartEmployeeMapper extends Mapper<LongWritable, Text, IntWritable,
Employee> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出:k2 部门号 v2 员工对象
context.write(new IntWritable(e.getDeptno()), //员工的部门号
e); //员工对象
}
}
Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package part; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class PartEmployeeReducer extends Reducer<IntWritable, Employee, IntWritable,
Employee> {
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
/*
* k3 部门号
* v3 部门的员工
*/
for(Employee e:v3){
context.write(k3, e);
}
}
}
Patitioner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package part; 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/*
* 建立自己的分区规则:根据员工的部门号进行分区
* 根据 Map 的输出 k2 v2
*/
public class MyEmployeeParitioner extends Partitioner<IntWritable, Employee>{
/*
* numPartition 参数:建立多少个分区
*/
@Override
public int getPartition(IntWritable k2, Employee v2, int numPartition) {
// 如何建立分区
if(v2.getDeptno() == 10){
//放入 1 号分区中
return 1%numPartition;
}else if(v2.getDeptno() == 20){
//放入 2 号分区中
return 2%numPartition;
}else{
//放入 0 号分区中
return 3%numPartition;
}
}
}
Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package part; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartEmployeeMain {
public static void main(String[] args) throws Exception {
// 创建一个 job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartEmployeeMain.class);
//指定 job 的 mapper 和输出的类型 k2 v2
job.setMapperClass(PartEmployeeMapper.class);
job.setMapOutputKeyClass(IntWritable.class); //部门号
job.setMapOutputValueClass(Employee.class); //员工

//指定任务的分区规则
job.setPartitionerClass(MyEmployeeParitioner.class);
//指定建立几个分区
job.setNumReduceTasks(3);

//指定 job 的 reducer 和输出的类型 k4 v4
job.setReducerClass(PartEmployeeReducer.class);
job.setOutputKeyClass(IntWritable.class); //部门号
job.setOutputValueClass(Employee.class); //员工
//指定 job 的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

合并Combiner

减少Mapper输出到Reduce的数据量,缓解网络传输瓶颈,提高reducer的执行效率

需要注意的问题:一定要谨慎使用Combiner
有些情况不能使用Combiner —-> 如:求平均值
保证引入Combiner以后,不能改变原来的逻辑

image-20230605153950725
image-20230605153723386

1
2
3
若要引入 Combiner,只需要在 WordCount 程序的 main()方法中,增加下面语句即可。
//引入一个 Combiner,是一种特殊的 Reducer
job.setCombinerClass(WordCountReducer.class);

MapReduce的工作机制

MapReduce 作业的运行机制

1
2
MapReduce
在YARN中的调度以及 MapReduce 中的 Shuffle 是 MapReduce 中的重要内容。下面详细介绍。
1
2
3
4
5
6
本小节将揭示Hadoop运行作业时所采取的措施,整个过程描述如图5-11所示。在最高层,有以下5个独立的实体。
● 客户端,提交MapReduce作业。
● YARN资源管理器(ResourceManager),负责协调集群上计算机资源的分配。
● YARN节点管理器(NodeManager),负责启动和监视集群中主机上的计算容器(Container)。
● MapReduce的Application Master(简写为MRAppMaster),负责协调运行 MapReduce作业的任务。它和MapReduce任务在容器中运行,这些容器由资源管理器分节点管理器进行管理。
● 分布式文件系统(一般为HDFS),用来与其他实体间共享作业文件。

image-20230605114218941

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MapReduce 在 YARN 中的运行过程大致分为如下 11 步,如图 5-11 所示。
(1)Client请求执行Job。即调用 Job 的 waitForCompletion(),包括提交 Job,并轮询获取、打印进度。
(2)向 ResourceManager 请求获取 MapReduce 的 JobID。
(3)计算输入分片。将运行作业所需的资源(jar 文件、配置文件、计算所得分片)保存在
HDFS下一个以JobID 命名的目录下。
(4)调用 ResourceManager的submitApplication()提交作业,同时传入(3)的资源。
(5)ResourceManager调度器分配一个容器,ResourceManager在NodeManager的管理下,在容器中启动MRAppMaster。
(6)MRAppMaster对作业初始化。
(7)MRAppMaster获取从HDFS中输入的分片,对每个分片创建一个Map Task和多个 Reduce Task对象。各个Task ID在此时分配。
(8)MRAppMaster决定如何运行作业的各个任务。如果作业很小,就选择在同一个JVM上
运行,否则,向ResourceManager请求新的容器。
(9)当ResourceManager分配了容器,MRAppMaster通过节点管理器启动容器。
(10)从HDFS获取作业配置、jar文件、分片文件,并资源本地化。
(11)运行Map任务和Reduce任务。

进度和状态的更新

1
2
3
4
5
6
7
8
 Map 任务或 Reduce 任务运行时,子进程和自己的 Application Master 进行通信。每隔 3 秒
钟,任务通过向自己的 Application Master 报告进度和状态(包括计数器),Application Master 会
形成一个作业的汇聚视图(Aggregate View)。
资源管理器的界面显示了所有运行中的应用程序,并且分别有链接指向这些应用各自的
Application Master 的界面,这些界面展示了 MapReduce 作业的更多细节,包括其进度。
在作业期间,客户端每秒钟轮询一次 Application Master 以接收最新状态(轮询间隔通过
mapreduce.client.progressmonitor.pollinterval 设置)。客户端也可以使用 Job 的 getStatus()方法得到
一个 JobStatus 的实例,后者包含作业的所有状态信息。

image-20230605115333371

1
2
3
4
状态更新在 MapReduce 系统中的传递流程可总结为如下几步。
(1)Map 任务或 Reduce 任务运行时,向自己的 MRAppMaster 报告进度和状态。
(2)MRAppMaster 形成一个作业的汇聚视图。
(3)客户端每秒钟轮询一次 MRAppMaster 获取最新状态。

Shuffle

1
2
3
4
MapReduce 确保每个 Reducer 的输入都是按键排序的。系统执行排序、将 Mapper 输出作为
输入传给 Reducer 的过程称为 Shuffle。从许多方面来看,Shuffle 是 MapReduce 的“心脏”,是
MapReduce 的最核心的部分,也被称为“奇迹发生的地方”。
图 5-13 描述了从 Map 到 Reduce 的过程。概括起来有如下这些关键的步骤。

image-20230605153128374

1.Map 端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(1)写入缓冲区。map 函数开始产生输出时,并不是简单地将它写入磁盘,而是先写入一个
缓冲区。在默认情况下,缓冲区的大小为 100MB。这个值可以通过改变 mapreduce.task.io.sort.mb
来调整。
(2)溢写。一旦缓冲内容达到阈值(mapreduce.map.sort.spill.percent,默认为 0.80,或 80%),
一个后台线程便开始把内容溢写(spill)入磁盘。在溢写入磁盘的过程中,Map 输出继续写入缓
冲区,但如果在此期间缓冲区被填满,Map 会被阻塞直到写磁盘的过程完成。溢写过程按轮询方
式将缓冲区的内容写到mapreduce.cluster.local.dir属性在作业特定子目录下指定的目录中。
(3)分区。在写磁盘之前,线程首先根据数据最终要传给的 Reducer 把数据划分成相应的分
区(Partition)。在每个分区中,后台线程按键进行内存排序。如果有一个 Combiner,它就在排序
后的输出上运行。运行 Combiner 使得 Map 输出结果更紧凑,因此可减少写入磁盘的数据和传递
给 Reducer 的数据。
(4)合并、排序。每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(Spill File),因此
在 Map 任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合
并成一个已分区且已排序的输出文件。配置属性 mapreduce.task.io.sort.factor 控制一次最多能合并
多少流,默认值是 10。

2.Reduce 端

1
2
3
4
(1)复制文件。Reducer通过HTTP得到输出文件的分区。如果Map输出相当小,会被复制到Reduce任务JVM的内存。否则,Map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent决定)或达到Map输出阈值(由mapreduce.reduce.merge.inmem.threshold 控制),则合并后溢写入磁盘。如果指定Combiner,则在合并期间运行它以降低写入磁盘的数据量。
(2)小文件合并。随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这为后面的合并节省一些时间。注意,为了合并,压缩的Map输出(通过Map任务)必须在内存中被解压缩。
(3)Reduce合并。复制完所有Map输出后,Reduce任务进入合并阶段,这个阶段将合并Map输出,维持其顺序排序。这是循环进行的。
(4)直接把数据输入Reduce()函数,对已排序输出的每个键调用Reduce()函数。阶段的输出直接写入输出文件系统(一般为HDFS)。

如果今天后悔昨天,那么明天就会后悔今天。

本文标题:hadoop理论课-第五章Mapreduce

文章作者:TTYONG

发布时间:2020年04月13日 - 16:04

最后更新:2023年07月04日 - 11:07

原始链接:http://tianyong.fun/hadoop%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%8A%80%E6%9C%AF%E4%B8%8E%E5%BA%94%E7%94%A8-%E7%AC%AC%E4%BA%94%E7%AB%A0Mapreduce(%E7%90%86%E8%AE%BA%E8%AF%BE).html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%