大数据开发工程师-第六周 第一章 剖析小文件问题与企业级解决方案


第六周 第一章 剖析小文件问题与企业级解决方案

MapReduce性能优化

1
2
3
4
5
6
现在大家已经掌握了MapReduce程序的开发步骤,注意了,针对MapReduce的案例我们并没有讲太多,主要是因为在实际工作中真正需要我们去写MapReduce代码的场景已经是凤毛麟角了,因为后面我们会学习一个大数据框架Hive,Hive支持SQL,这个Hive底层会把SQL转化为MapReduce执行,不需要我们写一行代码,所以说工作中的大部分需求我们都使用SQL去实现了,谁还苦巴巴的来写代码啊,一行SQL能抵你写的几十行代码,你还想去写MapReduce代码吗,肯定不想了。
但是MapReduce代码的开发毕竟是基本功,所以前面我们也详细的讲解了它的开发流程。
虽然现在MapReduce代码写的很少了,但是针对MapReduce程序的性能优化是少不了的,面试也是经常会问到的,所以下面我们就来分析一下MapReduce中典型的性能优化场景

第一个场景是:小文件问题
第二个场景是:数据倾斜问题

小文件问题

1
2
3
先一个一个来,不要着急,我们先看小文件问题
咱们前面分析过,Hadoop的HDFS和MapReduce都是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源
针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然存储了很多个文件,但是文件的体积并不大,这样就没有意义了。针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都会产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。
1
2
3
4
5
6
  针对这个问题,解决办法通常是选择一个容器,将这些小文件组织起来统一存储,HDFS提供了两种类型的容器,分别是SequenceFile和MapFile

SequeceFile是Hadoop提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件中。
一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文件中
但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件
所以这个SequenceFile其实可以理解为把很多小文件压缩成一个大的压缩包了。

SequenceFile

创建SequenceFile

代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
下面我们来具体看一下如何生成SequenceFile
生成SequenceFile需要开发代码

package com.imooc.mc;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;


/**
* 小文件解决方案之SequenceFile
* Created by xuwei
*/
public class SmallFileSeq {
public static void main(String[] args) throws Exception{
//生成SequenceFile文件
write("D:\\hadoopTestData\\", "/seqFile");
//读取SequenceFile文件
read("/seqFile");

}
/**
* 生成SequenceFile文件
* @param inputDir 输入目录-windows目录
* @param outputFile 输出文件-hdfs文件
* @throws Exception
*/
private static void write(String inputDir, String outputFile) throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出文件
fileSystem.delete(new Path(outputFile),true);

//构造opts数组,有三个元素
/*
第一个是输出路径
第二个是key类型
第三个是value类型
*/
SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outputFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)
};

//创建一个writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);

//指定要压缩的文件的目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
//
File[] files = inputDirPath.listFiles();
//
for(File file: files){
//获取文件全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
//
String fileName = file.getName();
//文件名作为key
Text key = new Text(fileName);
//文件内容作为value
Text value = new Text(content);
//
writer.append(key,value);
}
}
writer.close();
}

/**
* 读取SequenceFile文件
* @param inputFile SequenceFile文件路径
* @throws Exception
*/
private static void read(String inputFile) throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)) );
Text key = new Text();
Text value = new Text();
//循环读取数据
while(reader.next(key, value)){
//输出文件名称
System.out.println("文件名:"+key.toString());
//输出文件的内容
System.out.println("文件内容:"+value.toString()+" ");
}
reader.close();
}
}
1
2
执行代码中的write方法,可以看到在HDFS上会产生一个/seqFile文件,这个文件就是最终生成的大文件
执行代码中的read方法,可以输出小文件的名称和内容
读SquenceFile
1
只能通过遍历的方法去读
异常
1
2
3
java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

// 前往别加载错类,搞死个人
1
[WARN] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

MapFile

1
2
3
接下来我们来看一下MapFile
MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data
index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.imooc.mc;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.File;

public class SmallFileMap {
public static void main(String[] args) throws Exception{
//生成MapFile文件
write("D:\\hadoopTestData\\", "/mapFile");
//读取MapFile文件
read("/mapFile");
}
/**
* 生成SequenceFile文件
* @param inputDir 输入目录-windows目录
* @param outputDir 输出文件-hdfs文件
* @throws Exception
*/
private static void write(String inputDir, String outputDir) throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出文件
fileSystem.delete(new Path(outputDir),true);

//构造opts数组,有两个元素
/*
第一个是key类型
第二个是value类型
*/

SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)
};

//创建一个writer实例
MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts);

//指定要压缩的文件的目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
for(File file: files){
//获取文件全部内容
String content = FileUtils.readFileToString(file, "UTF-8");
String fileName = file.getName();
//文件名作为key
Text key = new Text(fileName);
//文件内容作为value
Text value = new Text(content);
writer.append(key,value);
}
}
writer.close();
}

private static void read(String inputDir) throws Exception{
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//创建阅读器
MapFile.Reader reader = new MapFile.Reader(new Path(inputDir), conf);
Text key = new Text();
Text value = new Text();
//循环读取数据
while(reader.next(key, value)){
//输出文件名称
System.out.println("文件名:"+key.toString());
//输出文件的内容
System.out.println("文件内容:"+value.toString()+" ");
}
reader.close();
}
}
1
2
执行代码中的write方法,可以看到在HDFS上会产生一个/mapFile目录,这个目录里面有两个文件,一个index索引文件,一个data数据文件
执行代码中的read方法,可以输出小文件的名称和

通过MapReduce读取SequenceFile

1
2
3
4
5
6
下面我们来看一个案例
我们来使用SequenceFile实现小文件的存储和计算
小文件的存储刚才我们已经通过代码实现了,接下来我们要实现如何通过MapReduce读取SequenceFile
咱们之前的代码默认只能读取普通文本文件,针对SequenceFile是无法读取的
那该如何设置才能让mapreduce可以读取SequenceFile呢?
很简单,只需要在job中设置输入数据处理类就行了,默认情况下使用的是TextInputFormat
1
2
3
job.setInputFormatClass(SequenceFileInputFormat.class)

// 在namenode高级那篇文章里数据逻辑分片也用到过
1
2
3
4
创建一个新的类WordCountJobSeq
注意修改两个地方
1. 修改job中的设置输入数据处理类
2. 修改map中k1的数据类型为Text类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.imooc.mc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sound.midi.Sequence;
import java.io.IOException;

/**
* 需求:读取SequenceFile文件
* Created by xuwei
*/

public class WordCountJobSeq {
// 静态内部类
// map阶段
// 注意:myMapper和myReducer类可以提到外面去写,这里只是为了方便学习
public static class myMapper extends Mapper<Text, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(myMapper.class);
/**
* 需要实现map函数
* 这个函数就是可以接受<k1,v1>, 产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text k1, Text v1, Context context)
throws IOException, InterruptedException {
//输出k1,v1的值
System.out.println("<k1, v1>: <"+k1.toString()+", "+v1.toString()+">");
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
//super.map(key, value, context);
}
}

/**
* reduce阶段
*/
public static class myReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(myReducer.class);
/**
* 对<k2, {v2...}>的数据进行累加求和,生成<k3,v3>
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
//创建一个sum变量,保存v2s的和
long sum = 0L;
for(LongWritable v2:v2s){
logger.info("<k2, v2>: <"+k2.toString()+", "+v2.get()+">");
//System.out.println("<k2, v2>: <"+k2.toString()+", "+v2.get()+">");
sum += v2.get();
}
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
logger.info("<k3, v3>: <"+k3.toString()+", "+v3.get()+">");
//System.out.println("<k3, v3>: <"+k3.toString()+", "+v3.get()+">");
context.write(k3, v3);
// super.reduce(key, values, context);
}
}
/**
* 组装job=map+reduce
*/
public static void main(String[] args){
try {
if(args.length!=2){
System.exit(100);
System.out.println("缺少路径参数!!!");
}
// 指定Job需要的配置参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否者在集群中执行时找不到WordCountJob这个类
job.setJarByClass(WordCountJobSeq.class);

// 指定输入路径,可以是文件也可以是目录(目录里只有一个文件时可以); 注意FileInputFormat别选成hadoop1.x的了
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定输出路径(只能是hdfs上一个不存在的目录); 注意FileOutFormat别选成hadoop1.x的了
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置输入数据处理类
job.setInputFormatClass(SequenceFileInputFormat.class);
// 指定map相关代码
job.setMapperClass(myMapper.class);
// 指定k2类型
job.setMapOutputKeyClass(Text.class);
// 指定v2类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(myReducer.class);
// 指定k3类型
job.setOutputKeyClass(Text.class);
// 指定v3类型
job.setOutputValueClass(LongWritable.class);

// 提交job
job.waitForCompletion(true);

}catch(Exception e){
e.printStackTrace();
}

}
}
1
2
重新编译打包执行
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobSeq /seqFile /out10
1
2
3
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out10/*
hello 10
you 10
1
2
此时到yarn的web界面上查看map任务的个数,发现只有1个,说明这样是生效的。
查看map任务的日志,查看打印的k1,v1日志信息

image-20230316001044238

image-20230316001505836

异常
1
2
3
4
5
6
7
1.在hdfs上创建的seqFile和mapFile都看不见,但IDEA上read方法却正确执行

conf.set("fs.defaultFS", "hdfs://bigdata01:9000") // FS写成了Fs

2.org.apache.hadoop.security.AccessControlException: Permission denied: user=***

https://blog.csdn.net/diqijiederizi/article/details/82753573

HDFS归档机制

image-20230315163947823

image-20230315164701053

image-20230315170322098

image-20230315170351685

image-20230315171021672

1
归档时会执行mapreduce程序

image-20230315171130067

1
程序是用index里的索引去part-0里找对应文件

image-20230315171357058

image-20230315171521646

image-20230315171548618

image-20230315171714254

image-20230315171835250

1
archieve在mapreduce中的应用

image-20230315172213685

image-20230315172531409


1
只有map的话,这里显示的是m;有reduce的话显示的是r

image-20230315220258143

image-20230315221132889

本文标题:大数据开发工程师-第六周 第一章 剖析小文件问题与企业级解决方案

文章作者:TTYONG

发布时间:2022年02月13日 - 14:02

最后更新:2023年06月04日 - 23:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%85%AD%E5%91%A8-%E7%AC%AC%E4%B8%80%E7%AB%A0-%E5%89%96%E6%9E%90%E5%B0%8F%E6%96%87%E4%BB%B6%E9%97%AE%E9%A2%98%E4%B8%8E%E4%BC%81%E4%B8%9A%E7%BA%A7%E8%A7%A3%E5%86%B3%E6%96%B9%E6%A1%88.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%