林子雨 大数据技术原理与应用-第7章 mapreduce


第7章 mapreduce

概述

分布式并行编程

1
2
3
4
•“摩尔定律”, CPU性能大约每隔18个月翻一番
•从2005年开始摩尔定律逐渐失效(大数据摩尔定律:每年按50%增长) ,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能
•分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力
•谷歌公司最先提出了分布式并行编程模型MapReduce,Hadoop MapReduce是它的开源实现,后者比前者使用门槛低很多
1
问题:在MapReduce出现之前,已经有像MPI这样非常成熟的并行计算框架了,那么为什么Google还需要MapReduce?MapReduce相较于传统的并行计算框架有什么优势?
image-20230303142923582

MapReduce模型简介

1
2
3
4
5
6
•MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce
•编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算
•MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理
•MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销
•MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker
•Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写

Map和Reduce函数

image-20230303144723603

MapReduce体系结构

1
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
image-20230303145010046

Client

1
2
•用户编写的MapReduce程序通过Client提交到JobTracker端
•用户可通过Client提供的一些接口查看作业运行状态

JobTracker

1
2
3
•JobTracker负责资源监控和作业调度
•JobTracker监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
•JobTracker会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(Task Scheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
image-20230303150311444

TaskTracker

1
2
•TaskTracker会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
•TaskTracker使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Map slot 和Reduce slot两种,分别供MapTask和Reduce Task使用

Task

1
Task分为Map Task 和Reduce Task 两种,均由TaskTracker 启动
image-20230303150354031

MapReduce工作流程

工作流程概述

image-20230303153107874

1
2
3
4
•不同的Map任务之间不会进行通信
•不同的Reduce任务之间也不会发生任何信息交换
•用户不能显式地从一台机器向另一台机器发送消息
•所有的数据交换都是通过MapReduce框架自身去实现的

MapReduce各个执行阶段

image-20230303153150037
1
2
3
split:是逻辑分割,如起点,长度
RR(Record Reader):按照split切分的起点和长度读取
shuffle:分区、排序、合并、归并过程

关于Split(分片)

image-20230303153252421
1
HDFS以固定大小的block为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

Map任务的数量

1
•Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块

image-20230303153342357

Reduce任务的数量

1
2
•最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
•通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)

Shuffle过程详解

Shuffle过程简介

image-20230303161924888

Map端的Shuffle过程

1
2
3
4
5
6
7
8
9
10
11
•每个Map任务分配一个缓存
•MapReduce默认100MB缓存
•设置溢写比例0.8
•分区默认采用哈希函数
•排序是默认的操作
•排序后可以合并(Combine)(用户定义了就执行,不是必须的)
•合并不能改变最终结果(注意事项) 如求和,最大值
•在Map任务全部结束之前进行归并(对之前多次溢写生成的磁盘文件,合并成大文件,文件里的数据是分区,排序了的)
•归并得到一个大的文件,放在本地磁盘
•文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
•JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
image-20230303162007805
1
2
合并(Combine)和归并(Merge)的区别:
两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>

Reduce端的Shuffle过程

1
2
3
4
•Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,若完成,则领取数据
•Reduce领取数据先放入缓存,来自不同Map机器,先归并,再合并,写入磁盘
•多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
•当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
image-20230303162053011

MapReduce应用程序执行过程

image-20230303162200768

1
中间的输出是输出到磁盘不是hdfs!!!

实例分析:WordCount

WordCount程序任务

image-20230303221931969 image-20230303221943836

WordCount设计思路

1
2
3
• 首先,需要检查WordCount程序任务是否可以采用MapReduce来实现
• 其次,确定MapReduce程序的设计思路
• 最后,确定MapReduce程序的执行过程

一个WordCount执行过程的实例

image-20230303222506161

用户没有定义Combiner时

image-20230303222631369

用户有定义Combiner时

image-20230303222701557

MapReduce的具体应用

1
2
3
4
5
MapReduce可以很好地应用于各种计算问题
• 关系代数运算(选择、投影、并、交、差、连接)
• 分组与聚合运算
• 矩阵-向量乘法
• 矩阵乘法

用MapReduce实现关系的自然连接

image-20230303224301191

1
2
3
4
• 假设有关系R(A,B)和S(B,C),对二者进行自然连接操作
• 使用Map过程,把来自R的每个元组<a,b>转换成一个键值对<b, <R,a>>,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。类似地,使用Map过程,把来自S的每个元组<b,c>,转换成一个键值对<b,<S,c>>
• 所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并
• Reduce进程的输出则是连接后的元组<a,b,c>,输出被写到一个单独的输出文件中
image-20230303224345042

MapReduce编程实践

任务要求

1
2
3
文件A的内容如下:
China is my motherland
I love China
1
2
文件B的内容如下:
I am from China
1
2
3
4
5
6
7
8
9
期望结果如右侧所示:
I 2
is 1
China 3
my 1
love 1
am 1
from 1
motherland 1

编写Map处理逻辑

1
2
3
4
•Map输入类型为<key,value>
•期望的Map输出类型为<单词,出现次数>
•Map输入类型最终确定为<Object,Text>
•Map输出类型最终确定为<Text,IntWritable>
1
2
3
4
5
6
7
8
9
10
11
12
13
public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws
IOException,InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word,one);
}
}
}

编写Reduce处理逻辑

1
2
3
4
5
6
7
8
9
•在Reduce处理数据之前,Map的结果首先通过Shuffle阶段进行整理
•Reduce阶段的任务:对输入数字序列进行求和
•Reduce的输入数据为<key,Iterable容器>
Reduce任务的输入数据:
<”I”,<1,1>>
<”is”,1>
……
<”from”,1>
<”China”,<1,1,1>>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static class MyReducer extends 
Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable>
values, Context context) throws IOException,InterruptedException{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}

编写main方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration(); //程序运行时参数
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2)
{ System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"word count"); //设置环境参数
job.setJarByClass(WordCount.class); //设置整个程序的类名
job.setMapperClass(MyMapper.class); //添加MyMapper类
job.setReducerClass(MyReducer.class); //添加MyReducer类
job.setOutputKeyClass(Text.class); //设置输出类型
job.setOutputValueClass(IntWritable.class); //设置输出类型
FileInputFormat.addInputPath(job,new Path(otherArgs[0])); //设置输入文件
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.io.IOException; 
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount{
public static class MyMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}

编译打包代码以及运行程序

1
2
3
4
5
实验步骤:
•使用java编译程序,生成.class文件
•将.class文件打包为jar包
•运行jar包(需要启动Hadoop)
•查看结果
1
2
3
4
5
6
7
8
Hadoop 2.x 版本中的依赖 jar
Hadoop 2.x 版本中 jar 不再集中在一个 hadoop-core*.jar 中,而是分成多个 jar,如使用 Hadoop 2.6.0 运行 WordCount 实例至少需要如下三个
jar:
•$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar
•$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduceclient-core-2.6.0.jar
•$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

通过命令 hadoop classpath 可以得到运行 Hadoop 程序所需的全部classpath信息
1
2
3
4
5
6
将 Hadoop 的 classhpath 信息添加到 CLASSPATH 变量中,在 ~/.bashrc 
中增加如下几行:
export HADOOP_HOME=/usr/local/hadoop export
CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH
执行 source ~/.bashrc 使变量生效,接着就可以通过 javac 命令编译
WordCount.java
image-20230304004002960

如何使用Eclipse编译运行MapReduce程序?

Hadoop中执行MapReduce任务的几种方式

1
2
3
4
5
6
•Hadoop jar
•Pig
•Hive
•Python
•Shell脚本
在解决问题的过程中,开发效率、执行效率都是要考虑的因素,不要太局限于某一种方法

本文标题:林子雨 大数据技术原理与应用-第7章 mapreduce

文章作者:TTYONG

发布时间:2023年02月28日 - 22:02

最后更新:2023年07月04日 - 11:07

原始链接:http://tianyong.fun/%E6%9E%97%E5%AD%90%E9%9B%A8-%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%8A%80%E6%9C%AF%E5%8E%9F%E7%90%86%E4%B8%8E%E5%BA%94%E7%94%A8-%E7%AC%AC7%E7%AB%A0-mapreduce.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%