大数据开发工程师-第八周 第6章 Hive技巧与核心复盘


第6章-Hive技巧与核心复盘

一个SQL语句分析

1
2
3
4
5
6
7
8
9
10
11
12
SELECT a.Key
, SUM(a.Cnt) AS Cnt
FROM (
SELECT Key, COUNT(*) AS Cnt
FROM TableName
GROUP BY Key,
CASE
WHEN Key = 'KEY001' THEN Hash(Random()) % 50
ELSE 0
END
) a
GROUP BY a.Key;
1
2
解释:这个SQL其实是一个解决数据倾斜的SQL
先看里面的select语句,里面的select语句其实是根据key进行分组,但是这个key对应的数据存在数据倾斜,key=KEY001的数据占了整份数据的90%,所以直接针对key进行分组肯定会出现数据倾斜,应该计算效率,所以在这里就实现了曲线救国,先把key=KEY001的数据打散,分成50份,进行局部聚合最后再通过外面的select进行全局的聚合,这样就可以显著提高计算效率

Hive的Web工具-HUE

1
2
3
4
5
6
7
8
Hive本身提供了Hwi web界面,但使用起来不方便

CHD的hue界面的使用
Hue-非技术人员操作Hive的利器
在Hive中提交任务,查看任务执行结果

官网已经有了配置好的hue案例
https://gethue.com/

image-20230322103321276

【扩展内容】Hive数据倾斜的解决方案

1
2
3
4
5
6
7
8
可能会触发Hive数据倾斜的几种情况

关键字 情形 后果
join 大表与小表,小表中的key集比较集中 分发到某一个或几个Reduce上的数据远高于平均值

join 大表与大表,但是join中指定的字段0值或空值过多 这些空值都由一个reduce处理,非常慢

group by group by维度过小,处理的数量过多 处理某值的reduce非常耗时
1
2
3
4
5
6
原因:
1)、key分布不均匀
2)、业务数据本身的特性
3)、建表时考虑不周
4)、某些SQL语句本身就有数据倾斜表现:
任务进度长时间维持在99%(或100务未完成。因为其处理的数据量和其单一reduce的记录数与平均记录数差数据倾斜的解决方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
表现:
任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。 最长时长远大于平均时长。

数据倾斜的解决方案

1.参数调节:
hive.map.aggr=true
Map端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询计划会有两个MR Job。第一个 MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

2.SQL语句调节:
大小表Join:
使用map join让小的维度表(1000条以下的记录条数) 先进内存,在map端完成join操作。正常情况下join是需要在reduce端执行的,通过map join可以实现在map端执行join操作,这样可以避免在shuffle的时候造成数据倾斜。
大表Join大表:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

count distinct大量相同特殊值
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

针对去重求和的需求还可以这样做:
采用sum() group by的方式来替换count(distinct)完成计算。

特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

1 常见数据压缩格式的使用

1
2
3
4
5
6
7
本次Hive扩展内容部分主要讲解Hive的数据存储格式,因为咱们前面默认使用的都是TextFile格式的数据,这种格式的数据在存储层面占用的空间比较大,影响存储能力,也影响计算效率。
所以为了提高Hive中数据的存储能力,以及计算性能,我们需要针对Hive的数据存储格式进行扩展。

在具体讲解Hive中的数据存储格式之前,我们需要先来了解一下MapReduce中的数据压缩格式。
因为数据存储格式想要发挥最大性能,还需要配合数据压缩格式一起使用。

下面我们先来学习一下数据压缩格式。

数据压缩格式

MapReduce中常见的数据压缩格式

1
2
3
4
5
6
7
8
9
10
MapReduce中常见的数据压缩格式主要包括下面这些:

DEFLATE
Gzip
Bzip2
Lz4
Lzo
Snappy

在这有一点需要注意:在Hadoop 3.x版本中,默认是没有集成Lzo压缩格式的,如果想要使用需要自己安装,其他的压缩格式默认都是支持的。
1
2
3
4
5
6
7
8
9
10
11
12
13
我们可以在Hadoop节点上执行hadoop checknative命令来确认一下。

[root@bigdata04 ~]# hadoop checknative
......
Native library checking:
hadoop: true /data/soft/hadoop-3.2.0/lib/native/libhadoop.so
zlib: true /lib64/libz.so.1
zstd : false
snappy: true /lib64/libsnappy.so.1
lz4: true revision:10301
bzip2: true /lib64/libbz2.so.1
openssl: true /lib64/libcrypto.so
ISA-L: false libhadoop was built without ISA-L support
1
2
3
这里面主要有zlib、snappy、lz4、bzip2。

注意:DEFLATE压缩格式底层使用的是zlib,Gzip是对DEFLATE进行了封装,所以只有lzo没有集成,其他的压缩格式都是可以正常使用的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
DEFLATE
DEFLATE是同时使用了LZ77算法与哈夫曼编码(Huffman Coding)的一个无损数据压缩算法,DEFLATE压缩与解压的源代码可以在自由、通用的压缩库Zlib上找到,Zlib官网:http://www.zlib.net/ jdk中对Zlib压缩库提供了支持,压缩类Deflater和解压类Inflater,Deflater和Inflater都提供了Native方法。

Gzip
Gzip的实现算法还是DEFLATE,只是在DEFLATE格式上增加了文件头和文件尾,同样JDK也对Gzip提供了支持,分别是GZIPOutputStream和GZIPInputStream类,同样可以发现GZIPOutputStream是继承于DeflaterOutputStream的,GZIPInputStream继承于InflaterInputStream,并且可以在源码中发现writeHeader和writeTrailer方法。

Bzip2
Bzip2是Julian Seward开发并按照自由软件/开源软件协议发布的数据压缩算法及程序。Seward在1996年7月第一次公开发布了Bzip2 0.15版,在随后几年中这个压缩工具稳定性得到改善并且日渐流行,Seward在2000年晚些时候发布了1.0版。Bzip2比传统的Gzip的压缩效率更高,但是它的压缩速度较慢。

Lz4
Lz4是一种无损数据压缩算法,着重于压缩和解压缩速度。

Lzo
Lzo是致力于解压速度的一种数据压缩算法,Lzo是Lempel-Ziv-Oberhumer的缩写,这个算法是无损算法。

Snappy
Snappy(以前称Zippy)是Google基于LZ77的思路用C++语言编写的快速数据压缩与解压程序库,并在2011年开源。它的目标并非最大压缩率或与其他压缩程序库的兼容性,而是非常高的速度和合理的压缩率。
1
在这里我们主要分析一下这几种压缩格式的自身特点和性能

image-20230617224343135

1
2
3
4
5
6
7
8
9
10
11
主要针对压缩格式的文件扩展名、是否可切分、压缩比、压缩速度和解压速度这几项进行对比。

文件扩展名:表示压缩后的数据文件的后缀名称。

是否可切分:表示压缩后的数据文件在被MapReduce读取的时候,是否会产生多个InputSplit。如果这个压缩格式产生的文件不可切分,那也就意味着,无论这个压缩文件有多大,在MapReduce中都只会产生1个Map任务。如果压缩后的文件不大,也就100M左右,这样对性能没有多大影响。但是如果压缩后的文件比较大,达到了1个G,由于不可切分,这样只能使用1个Map任务去计算,性能就比较差了,这个时候就没有办法达到并行计算的效果了。所以是否可切分这个特性是非常重要的,特别是当我们无法控制单个压缩文件大小的时候。

压缩比:表示压缩格式的压缩效果,压缩比越高,说明压缩效果越好,对应产生的压缩文件就越小。如果集群的存储空间有限,则需要重点关注压缩比,这个时候需要选择尽可能高的压缩比。

压缩速度:表示将原始文件压缩为指定压缩格式消耗的时间。压缩功能消耗的时间会体现在任务最终消耗的时间里面,所以这个指标也需要重点考虑。

解压速度:表示将指定压缩格式的数据文件解压为原始文件消耗的时间。因为MapReduce在使用压缩文件的时候需要先进行解压才能使用,解压消耗的时间也会体现在任务最终消耗的时间里面,所以这个指标也需要重点考虑。
1
2
3
4
5
6
7
8
9
针对表格中的这几种压缩格式,他们的文件扩展名还是很容易区分的,基本上都是以压缩格式名作为后缀,这个主要是为了后期能够通过文件后缀名快速区分出来使用的哪种压缩格式。

针对是否可切分,主要是Bzip2可以原生支持切分,还有一个Lzo通过给压缩数据建立索引也可以支持切分。

注意:这个表中所说的是否可切分是针对TextFile文件,也就是普通文本文件而言的。因为针对某一些特殊的文件格式,结合不可切分的压缩格式之后,依然是可以支持切分的,这是由于这些特殊文件格式自身的特性决定的,和压缩格式的特性没有关系,这个在后面讲Hive中的数据格式的时候会详细分析。

针对压缩比:Bzip2格式的压缩比是比较高的,压缩效果是最好的,也就是说针对相同大小的文件,使用Bzip2压缩后的文件是最小的。

针对压缩速度和解压速度:Bzip2的压缩速度和解压速度都是最低的,其实也是可以理解的,想要压缩效果好,对应的压缩和解压肯定会多消耗一些时间。
1
我们平时在使用windows中的压缩工具的时候其实也可以选择使用速度最快或者体积最小,大家在下面可以试验一下,如果选择体积最小,肯定速度就比较慢了。

image-20230617224725867

数据压缩格式选择建议

1
2
3
4
5
6
7
结合前面分析的数据压缩格式的特点进行考虑,Bzip2这个压缩格式虽然支持切分,并且压缩效果也是最好的,但是并不一定在所有场景下都是最优的选择,因为它的压缩和解压是最慢的。

那么我们应该如何选择压缩格式呢?
下面是几条选择建议,可以作为参考:
-使用包含压缩并且支持切分的文件格式,比如Sequence File,RCFile、ORC等,这些文件格式后面都会详细讲到。
-使用支持切分的压缩格式,例如:Bzip2和Lzo。这个主要是针对textFile文件格式。
-提前把一个大文件拆分成多个块,每个块单独压缩,这样就不需要考虑是否可切分的问题了。

压缩位置

1
2
3
在MapReduce的整个过程中,可以在两个地方设置数据压缩格式:
一个是针对Map阶段的输出数据进行压缩。
一个是针对Reduce阶段的输出数据进行压缩。

image-20230617225343130

1
针对Map阶段的输出数据:建议选择压缩和解压速度快的压缩格式。Map阶段的数据落盘后会通过Shuffle,也就是通过网络传输到Reduce端。压缩Map的输出是可以提高网络传输效率的。但是压缩Map的输出会增加CPU的消耗。Map阶段在处理数据的时候自己本来就会消耗过多的CPU,所以此时应该重点考虑使用压缩和解压速度比较快的LZO、Snappy。
1
2
3
针对Reduce阶段的输出数据:需要分为两种场景。
如果结果数据是永久保存,此时需要重点考虑压缩效果比较好的Bzip2和Gzip。
如果结果数据还需要让另一个MapReduce任务继续计算,则需要重点考虑压缩后的数据文件是否支持切分。比如:Bzip2、Lzo。

数据压缩格式案例实战

image-20230621114254246

1
前面我们对常见的数据压缩格式有了一定的了解,下面我们通过一个案例具体演示一下这些数据压缩格式的使用。
1
2
3
4
5
6
这个图里面显示的是未压缩的数据文件和各种压缩格式压缩后的数据文件的大小。
其中未压缩的数据文件大小为2267M,图中按照压缩效果进行排列,最右侧的是压缩效果最好的,Bzip2压缩后的文件只有203M。

这些数据指标是我在本地集群通过MapReduce对数据文件压缩后得出的结果,下面我们来看一下具体的过程,正好也可以感受一下在MapReduce中如何对数据进行压缩。

注意:这个压缩后的数据文件大小,和原始数据内容也是有关系的,所以这里的数值大小仅作为参考,但是都是符合这个规律的。
1
2
3
下面我们来看一下具体的压缩过程:
我们首先基于之前开发的WordCountJobQueue代码重新复制出一个MrDataCompress(新建一个compress包)。
核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.imooc.compress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**
* 数据压缩格式案例
* Created by xuwei
*/
public class MrDataCompress {
/**
* Map阶段
*/
public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
String[] words = v1.toString().split(" ");
for (String word : words) {
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
}
}


/**
* Reduce阶段
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum = 0L;
for(LongWritable v2: v2s){
sum += v2.get();
}

Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
context.write(k3,v3);
}
}

/**
* 组装Job=Map+Reduce
*/
public static void main(String[] args) {
try{
//指定Job需要的配置参数
Configuration conf = new Configuration();
//解析命令行中通过-D传递过来的参数,添加到conf中
String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

//创建一个Job
Job job = Job.getInstance(conf);

//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
job.setJarByClass(MrDataCompress.class);

//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));

//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);


//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3的类型
job.setOutputKeyClass(Text.class);
//指定v3的类型
job.setOutputValueClass(LongWritable.class);

//提交job
job.waitForCompletion(true);
}catch(Exception e){
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
开发一个生成测试数据的代码:生成一个2G的文件。

package com.imooc.compress;

import java.io.BufferedWriter;
import java.io.FileWriter;

/**
* 生成测试数据
* Created by xuwei
*/
public class GenerateDat {
public static void main(String[] args) throws Exception{
String fileName = "D:\\words.dat";
System.out.println("start: 开始生成2G文件->"+fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while(num<75000000){
bfw.write("hello_"+num+" you_"+num);
bfw.newLine();
num ++;
if(num%10000==0){
bfw.flush();
}
}
System.out.println("end: 2G文件已生成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
将生成的2G文件上传HDFS上。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -put words.dat /

将MrDataCompress打成jar包也上传到bigdata04节点上面。
D:\IdeaProjects\db_hadoop>mvn clean package -DskipTests
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.775s
[INFO] Finished at: Wed Mar 09 16:14:14 CST 2022
[INFO] Final Memory: 27M/415M
[INFO] ------------------------------------------------------------------------
1
2
3
注意:在执行下面任务的时候,集群容易出现磁盘不够用的情况,可能会导致DataNode节点状态异常,进而导致集群不可用。

因为HDFS我们已经用了很长时间了,里面可能存储了很多的数据,导致存储空间不足。可以到HDFS的web界面中查看当前磁盘空间的使用情况。

image-20230617230802403

1
2
如果发现MapReduce任务提交到集群上却无法正常执行,一直处于等待状态,排查集群发现进程都在。
此时可以到YARN的界面上看一下,确认一下ActiveNode中的数字是否是2,表示2个可用节点。

image-20230617230946191

1
2
3
因为我们的虚拟机磁盘默认分配的是20G,空间不是很大,集群使用一段时间之后,HDFS中的数据就比较多了,磁盘就不够用了,找到HDFS中的一些大文件删除一下即可,并且把回收站中的内容也清理一下。

使用这个命令可以查看hdfs中磁盘空间的占用情况。
1
[root@bigdata04 ~]# hdfs dfs -du -h /
1
2
3
4
5
注意:在删除HDFS中大文件的时候最好带上skipTrash,这样删除的文件就不会进到回收站了,否则删完以后HDFS的空间依然不会释放。

hdfs dfs -rm -r -skipTrash /out

修复之后等一会集群会自动把节点从非健康状态转为活跃状态,这样任务就可以获取到资源继续执行了。

执行未压缩的任务

1
2
3
4
5
6
7
8
9
10
11
12
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress /words.dat /words_out/non_compress

产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/non_compress
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-24 17:41 /words_out/non_compress/_SUCCESS
-rw-r--r-- 2 root supergroup 2377777780 2027-08-24 17:41 /words_out/non_compress/part-r-00000

为了减少HDFS空间占用,查看后建议删除此文件。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/non_compress

2.2G

执行Deflate压缩格式的任务

1
2
3
此时需要用到Deflate压缩格式实现类的全类名,其实这些压缩格式的实现类在MapReduce的源码中是可以找到的。
我们前面分析了MapReduce中Inputformat的源码,再回到这块源码里面。
找到LineRecordReader中的initialize方法:
1
2
3
4
5
6
7
8
9
10
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
.......
//如果文件是压缩文件,则执行if中的语句
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
......
}
1
2
3
4
5
6
找到SplittableCompressionCodec接口,这个接口继承了CompressionCodec接口,查看CompressionCodec接口的实现类:
org.apache.hadoop.io.compress.DeflateCodec == org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec == org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
1
2
3
4
5
6
7
其实通过代码也可以看到这些压缩格式是否可以支持切分。
如果codec是SplittableCompressionCodec的实现类,则可以切分。
查看SplittableCompressionCodec的实现类,发现只有BZip2这个子类。
因为Lzo不是默认支持的,所以这里没有显示。

针对Reduce输出结果进行压缩。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec /words.dat /words_out/compress_deflate
1
2
3
4
5
6
7
产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_deflate
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-24 18:18 /words_out/compress_deflate/_SUCCESS
-rw-r--r-- 2 root supergroup 363001985 2027-08-24 18:18 /words_out/compress_deflate/part-r-00000.deflate

316M
1
2
3
4
5
其实前面通过代码层面可以验证Deflate压缩格式的数据文件不支持切分,不过最好还是能通过代码验证一下。

通过程序验证Deflate压缩格式的数据是否支持切分。
继续使用之前的代码读取压缩后的数据即可。(mapreduce可以自动识别压缩后的数据)
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec /words_out/compress_deflate /words_out/compress_deflate_test
1
2
3
4
5
然后关注下面的日志信息,主要查看number of splits后面的值,这个值表示针对任务的输入数据会产生几个Split。
2027-08-24 18:23:13,002 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-24 18:23:13,815 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819095495185_0004
2027-08-24 18:23:14,179 INFO input.FileInputFormat: Total input files to process : 1
2027-08-24 18:23:14,678 INFO mapreduce.JobSubmitter: number of splits:1
1
2
3
4
5
这里显示的是1,说明 /words_out/compress_deflate/part-r-00000.deflate这个346M的文件只会产生1个Split,最终也只会产生1个map任务。

验证之后把这个任务停掉即可。

[root@bigdata04 hadoop-3.2.0]# yarn application -kill application_1819095495185_0004
1
2
3
4
为了减少HDFS空间占用,查看后建议删除生成的压缩文件。

[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_deflate
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_deflate_test

执行Gzip压缩格式的任务

1
2
针对Reduce输出结果进行压缩。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec /words.dat /words_out/compress_gzip
1
2
3
4
5
6
7
产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_gzip
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 10:24 /words_out/compress_gzip/_SUCCESS
-rw-r--r-- 2 root supergroup 363001997 2027-08-25 10:24 /words_out/compress_gzip/part-r-00000.gz

346M
1
2
3
为了减少HDFS空间占用,查看后建议删除生成的压缩文件。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_gzip
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_gzip_test
1
2
3
4
5
6
7
8
9
10
验证Gzip压缩格式的数据是否支持切分。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec /words_out/compress_gzip /words_out/compress_gzip_test
2027-08-25 10:28:36,434 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 10:28:37,231 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819159668196_0002
2027-08-25 10:28:37,522 INFO input.FileInputFormat: Total input files to process : 1
2027-08-25 10:28:37,606 INFO mapreduce.JobSubmitter: number of splits:1
这里显示的是1,说明这个346M的文件没有被切分。

验证之后把这个任务停掉即可。
[root@bigdata04 hadoop-3.2.0]# yarn application -kill application_1819159668196_0002

执行Bzip2压缩格式的任务

1
2
3
4
5
6
7
8
9
10
针对Reduce输出结果进行压缩。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec /words.dat /words_out/compress_bzip2

产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_bzip2
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 10:42 /words_out/compress_bzip2/_SUCCESS
-rw-r--r-- 2 root supergroup 213226630 2027-08-25 10:42 /words_out/compress_bzip2/part-r-00000.bz2

203M
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
验证Bzip2压缩格式的数据是否支持切分。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec /words_out/compress_bzip2 /words_out/compress_bzip2_test
2027-08-25 10:47:21,510 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 10:47:22,264 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819159668196_0004
2027-08-25 10:47:22,597 INFO input.FileInputFormat: Total input files to process : 1
2027-08-25 10:47:22,663 INFO mapreduce.JobSubmitter: number of splits:2

这里显示的是2,说明这个203M的文件被切分成了2个Split,最终就会产生2个Map任务。
所以从这也可以验证Bzip2压缩格式的数据文件是可以被切分的。

验证之后把这个任务停掉即可。
[root@bigdata04 hadoop-3.2.0]# yarn application -kill application_1819159668196_0004

为了减少HDFS空间占用,查看后建议删除生成的压缩文件。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_bzip2
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_bzip2_test

执行Lz4压缩格式的任务

1
2
3
4
5
6
7
8
9
10
针对Reduce输出结果进行压缩。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codec /words.dat /words_out/compress_lz4

产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_lz4
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 10:57 /words_out/compress_lz4/_SUCCESS
-rw-r--r-- 2 root supergroup 589955900 2027-08-25 10:57 /words_out/compress_lz4/part-r-00000.lz4

562M
1
2
3
4
5
6
7
8
9
10
11
12
验证Lz4压缩格式的数据是否支持切分。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.Lz4Codec /words_out/compress_lz4 /words_out/compress_lz4_test
2027-08-25 10:58:56,842 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 10:58:57,612 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819159668196_0006
2027-08-25 10:58:57,872 INFO input.FileInputFormat: Total input files to process : 1
2027-08-25 10:58:57,946 INFO mapreduce.JobSubmitter: number of splits:1

这里显示的是1,说明这个562M的文件没有被切分。

验证之后把这个任务停掉即可。

为了减少HDFS空间占用,查看后建议删除生成的压缩文件。

执行Snappy压缩格式的任务

1
2
3
4
5
6
7
8
9
10
针对Reduce输出结果进行压缩。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /words.dat /words_out/compress_snappy

产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_snappy
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 11:10 /words_out/compress_snappy/_SUCCESS
-rw-r--r-- 2 root supergroup 682743908 2027-08-25 11:10 /words_out/compress_snappy/part-r-00000.snappy

651
1
2
3
4
5
6
7
8
9
10
11
12
验证Snappy压缩格式的数据是否支持切分。
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /words_out/compress_snappy /words_out/compress_snappy_test
2027-08-25 11:13:35,799 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 11:13:36,871 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819159668196_0008
2027-08-25 11:13:37,177 INFO input.FileInputFormat: Total input files to process : 1
2027-08-25 11:13:37,355 INFO mapreduce.JobSubmitter: number of splits:1

这里显示的是1,说明这个651M的文件没有被切分。

验证之后把这个任务停掉即可。

为了减少HDFS空间占用,查看后建议删除生成的压缩文件。

执行Lzo压缩格式的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
注意:需要先在Hadoop集群中集成Lzo。

(1)在Hadoop集群中集成Lzo。

1.将编译好的hadoop-lzo-0.4.21-SNAPSHOT.jar(这个jar包是我在本地基于Hadoop3.2.0版本编译的)上传到Hadoop集群的/data/soft/hadoop-3.2.0/share/hadoop/common目录中。

注意:需要上传到集群中的所有节点里面。
首先上传到bigdata01节点上。
[root@bigdata01 ~]# ll /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar
-rw-r--r--. 1 root root 188706 Mar 10 2022 /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar

然后同步到bigdata02和bigdata03中。
[root@bigdata01 ~]# scp -rq /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar bigdata02:/data/soft/hadoop-3.2.0/share/hadoop/common
[root@bigdata01 ~]# scp -rq /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar bigdata03:/data/soft/hadoop-3.2.0/share/hadoop/common

2.修改hadoop集群中的core-site.xml文件。
首先在bigdata01中修改core-site.xml文件。

注意:将下面内容添加到core-site.xml中,不是覆盖,是追加。

<!-- 配置使用的各种压缩算法的编/解码器 -->
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.DeflateCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
org.apache.hadoop.io.compress.Lz4Codec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>

<!-- 配置Lzo编解码器相关参数 -->
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
1
2
3
4
5
6
7
8
9
然后同步到bigdata02和bigdata03中。
[root@bigdata01 ~]# cd /data/soft/hadoop-3.2.0/etc/hadoop/
[root@bigdata01 hadoop]# scp -rq core-site.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
[root@bigdata01 hadoop]# scp -rq core-site.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/

重启集群。
需要重启才能生效。
[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(2)使用Lzo压缩。
针对Reduce输出结果进行压缩。

注意:需要使用LzopCodec,而不是LzoCodec。

如果mapreduce.output.fileoutputformat.compress.codec参数设置为com.hadoop.compression.lzo.LzoCodec,则MapReduce任务输出文件为.lzo_deflate

若此参数设置为com.hadoop.compression.lzo.LzopCodec,则MapReduce任务输出文件为.lzo,只有.lzo压缩文件才支持建立索引,进而支持压缩文件分片。

[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /words.dat /words_out/compress_lzo

产生的结果数据是这样的:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_lzo
Found 2 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 12:08 /words_out/compress_lzo/_SUCCESS
-rw-r--r-- 2 root supergroup 642730761 2027-08-25 12:08 /words_out/compress_lzo/part-r-00000.lzo

612M
1
2
3
4
5
6
7
8
9
10
11
12
13
14
验证Lzo压缩格式的数据是否支持切分。

注意:在读取Lzo文件的时候需要修改Inputformat,使用Lzo对应的Inputformat。使用默认的TextFileInputformat(使用其它之前的压缩格式,在读取时,默认都是这个)会报错。

此时需要将hadoop-lzo-0.4.21-SNAPSHOT.jar拷贝到提交任务的客户端节点上(之前上传到了bigdata01-03),否则在这个节点提交任务会提示找不到LzoInputFormat
[root@bigdata01 ~]# scp -rq /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar bigdata04:/data/soft/hadoop-3.2.0/share/hadoop/common

[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /words_out/compress_lzo /words_out/compress_lzo_test
2027-08-25 12:12:00,565 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 12:12:01,465 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819164281247_0007
2027-08-25 12:12:01,714 INFO input.FileInputFormat: Total input files to process : 1
2027-08-25 12:12:01,796 INFO mapreduce.JobSubmitter: number of splits:1

这里显示的是1,说明这个612M的Lzo文件没有被切分。
1
2
3
4
5
6
7
8
9
10
11
针对Lzo文件,想要实现可切分,需要对Lzo文件创建索引。

对Lzo文件创建索引。
[root@bigdata04 hadoop-3.2.0]# hadoop jar /data/soft/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar com.hadoop.compression.lzo.DistributedLzoIndexer /words_out/compress_lzo

运行结束后,会在相同路径下生成xxxx.lzo.index索引文件。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -ls /words_out/compress_lzo
Found 3 items
-rw-r--r-- 2 root supergroup 0 2027-08-25 12:08 /words_out/compress_lzo/_SUCCESS
-rw-r--r-- 2 root supergroup 642730761 2027-08-25 12:08 /words_out/compress_lzo/part-r-00000.lzo
-rw-r--r-- 2 root supergroup 77432 2027-08-25 12:35 /words_out/compress_lzo/part-r-00000.lzo.index
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
重新验证一下Lzo文件的切分效果。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_lzo_test
Deleted /words_out/compress_lzo_test
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.compress.MrDataCompress -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec /words_out/compress_lzo /words_out/compress_lzo_test
2027-08-25 12:37:58,151 INFO client.RMProxy: Connecting to ResourceManager at bigdata01/192.168.182.100:8032
2027-08-25 12:37:58,949 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1819164281247_0010
2027-08-25 12:37:59,214 INFO input.FileInputFormat: Total input files to process : 2
2027-08-25 12:37:59,338 INFO mapreduce.JobSubmitter: number of splits:5

这个时候发现,这个612M的Lzo文件被切分成了5个Split。

验证之后把这个任务停掉即可。
[root@bigdata04 hadoop-3.2.0]# yarn application -kill application_1819164281247_0010

为了减少HDFS空间占用,查看后建议删除生成的压缩文件。
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_lzo
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -rm -r -skipTrash /words_out/compress_lzo_test

2 常见数据存储格式的使用

1
2
在最开始学习Hive的时候我们说到了,Hive没有专门的数据存储格式,默认可以直接加载文本文件TextFile,还支持SequenceFile、RCFile这些。
其实完整来说,主要包括下面这些数据存储格式。

image-20230618003521698

1
2
3
4
5
6
7
其中RCFile数据存储格式是从Hive 0.6版本开始支持的。
Avro数据存储格式是从Hive 0.9版本开始支持的。
ORC数据存储格式是从Hive 0.11版本开始支持的。
PARQUET数据存储格式是Hive 0.13版本开始支持的。

这些信息主要来源于Hive官网。
https://cwiki.apache.org/confluence/display/Hive/

image-20230618003613888

image-20230618003628299

1
2
3
4
5
6
7
8
目前工作中使用最多的是TextFile、ORC和Parquet。

默认情况下使用TextFile即可,想要提高数据存储和计算效率,可以考虑使用ORC或者Parquet。

本次课程中我们主要演示TextFile、SequenceFile、RCFile、ORC、以及PARQUET的用法。
Avro存储格式在这里不再详细分析,因为这个基本上用不到。

下面首先分析一下TextFile。

数据存储格式之TextFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
TextFile是Hive的默认数据存储格式,基于行存储。
它的主要特点是磁盘存储开销大,数据解析开销大。

怎么理解呢?

磁盘存储开销大:因为存储的是原始文件内容,没有使用压缩,所以存储开销会比较大。
数据解析开销大:因为在反序列化读取数据的过程中,必须逐个字符判断是不是字段分隔符和行结束符,所以数据解析开销大。

当然了,如果想要减少磁盘存储开销,也可以对TextFile格式的数据进行压缩,但是部分压缩格式在Hive中无法切割。

数据的压缩格式其实是在MapReduce中提出的,因为Hive底层支持MapReduce,所以Hive中也支持这些压缩格式。

在这里做一个试验,验证一下Hive中如何集成这些数据压缩格式,以及对数据切分的支持程度。

首先在Hive中创建一个不带压缩的普通表。
1
2
3
4
5
6
7
8
create external table stu_textfile(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
location 'hdfs://bigdata01:9000/stu_textfile';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
然后生成测试数据,并把测试数据加载到这个表中。
生成测试数据的代码如下:

package com.imooc.hive;

import java.io.BufferedWriter;
import java.io.FileWriter;

/**
* Created by xuwei
*/
public class GenerateHiveData {
public static void main(String[] args) throws Exception{
String fileName = "D:\\stu_textfile.dat";
System.out.println("start: 开始生成文件->"+fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while(num<60000000){
bfw.write("1"+num+",zs"+num+",beijing"+num);
bfw.newLine();
num ++;
if(num%10000==0){
bfw.flush();
}
}
System.out.println("end: 文件已生成");
}
}
1
2
3
4
5
6
7
8
9
10
将这个数据文件上传到Hive表stu_textfile对应的hdfs目录中。

[root@bigdata04 soft]# hdfs dfs -put stu_textfile.dat /stu_textfile

确认通过表stu_textfile是否可以查到数据:
hive (default)> select * from stu_textfile limit 1;
OK
stu_textfile.id stu_textfile.name stu_textfile.city
10 zs0 beijing0
Time taken: 1.936 seconds, Fetched: 1 row(s)

使用Deflate压缩格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
接下来基于stu_textfile这个普通数据表构建一个新的压缩数据表。
创建这个压缩数据表:建表语句和普通表没有区别,唯一的区别就是在向表中添加数据的时候指定数据压缩格式。
create external table stu_textfile_deflate_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
location 'hdfs://bigdata01:9000/stu_textfile_deflate_compress';

Hive中默认是没有开启压缩的,想要开启压缩需要使用下面这条命令:
hive (default)> set hive.exec.compress.output=true;
hive (default)> set mapreduce.output.fileoutputformat.compress=true;
hive (default)> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec;

这条命令表示开启Hive输出数据压缩功能,并且设置使用deflate压缩格式。

如果后期不确定使用的是哪种压缩格式,可以通过这条命令查看:
hive (default)> set mapreduce.output.fileoutputformat.compress.codec;
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
接下来通过insert into select从普通表中查询数据,插入到压缩表中。

注意:为了能够控制任务最终产生的数据文件个数,在这里通过mapreduce.job.reduces来控制,并且SQL语句中需要有可以产生shuffle的操作,如果是普通的select语句,最终是不会产生Reduce任务的,那么mapreduce.job.reduces这个参数就无法生效了。

在这里我们设置mapreduce.job.reduces=1,表示将结果数据写入到一个数据文件中,这也便于后面验证这个数据文件是否支持Split。
hive (default)> set mapreduce.job.reduces=1;
hive (default)>insert into stu_textfile_deflate_compress select id,name,city from stu_textfile group by id,name,city;

最终产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_textfile_deflate_compress
Found 1 items
-rw-r--r-- 2 root supergroup 407444004 2027-08-25 17:18 /stu_textfile_deflate_compress/000000_0.deflate

这个结果数据文件大小为388M。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来写一个sql查询压缩表中的数据,确认一下是否会生成多个Map任务。
hive (default)> select id,count(*) from stu_textfile_deflate_compress group by id;
Query ID = root_20270825172055_a11694bd-92f8-4e1b-b813-ddf31f0fa746
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819179801908_0003, Tracking URL = http://bigdata01:8088/proxy/application_1819179801908_0003/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819179801908_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
1
2
3
4
5
6
在这里发现这个SQL最终只生成了1个Map任务,也就意味着这个388M的文件是有一个Map任务负责计算。

所以在Hive中,deflate压缩格式的特点和我们前面分析的也是一样的。

这个压缩表中的数据查看后最好删除一下,这样可以释放HDFS存储空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_textfile_deflate_compress

使用Bzip2压缩格式

1
2
3
4
5
6
7
8
9
10
11
再来验证一下Bzip2压缩格式。
创建压缩表:

create external table stu_textfile_bzip2_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
location 'hdfs://bigdata01:9000/stu_textfile_bzip2_compress';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
指定bzip2压缩格式:
hive (default)> set hive.exec.compress.output=true;
hive (default)> set mapreduce.output.fileoutputformat.compress=true;
hive (default)> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec;

从普通表中查询数据插入到压缩表中。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_textfile_bzip2_compress select id,name,city from stu_textfile group by id,name,city;

最终产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_textfile_bzip2_compress
Found 1 items
-rw-r--r-- 2 root supergroup 199210105 2027-08-25 17:37 /stu_textfile_bzip2_compress/000000_0.bz2

这个数据文件大小为189M。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来写一个sql查询压缩表中的数据,确认一下是否会生成多个Map任务。
hive (default)> select id,count(*) from stu_textfile_bzip2_compress group by id;
Query ID = root_20270825174140_50d2ff59-69ea-4f8f-a149-76897b7a8b90
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819179801908_0006, Tracking URL = http://bigdata01:8088/proxy/application_1819179801908_0006/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819179801908_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
结果发现只产生了1个Map任务,正常情况下这份数据会被切分成2个Split,产生2个Map任务的,为什么这里只有1个Map任务呢?

难道是在Hive中,Bzip2不支持切分了?
不是这样的。
原因是:因为Hive中默认的Split大小是244M,这个结果文件小于244M,所以Hive最终只切分了1个Split,也就只产生了1个Map任务。

Hive在读取数据的时候目前使用的是CombineHiveInputFormat,可以这样确认一下:
hive (default)> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;


CombineHiveInputFormat中在切分Split的时候会参考mapred.max.split.size参数的值。
mapred.max.split.size参数的值在Hive中默认是244M。
hive (default)> set mapred.max.split.size;
mapred.max.split.size=256000000

所以Bzip2还是支持切分的,只是因为这个文件大小没有超过244M。
我们可以在这里调整mapred.max.split.size参数的值,改为128M来验证一下。
hive (default)> set mapred.max.split.size=134217728;
hive (default)> select id,count(*) from stu_textfile_bzip2_compress group by id;
Query ID = root_20270825174607_43ac6b00-93e6-4a96-b53b-75ddd1dddfd9
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819179801908_0007, Tracking URL = http://bigdata01:8088/proxy/application_1819179801908_0007/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819179801908_0007
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
1
2
3
这个时候发现产生了2个Map任务,最终可以确认,在Hive中Bzip2压缩文件也是支持切分的。
这个压缩表中的数据查看后最好也删除一下,这样可以释放HDFS存储空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_textfile_bzip2_compress

数据存储格式之SequenceFile

1
2
3
4
5
6
7
8
9
10
11
12
下面我们来看一下SequenceFile这种存储格式
其实我们前面在Hadoop课程中分析小文件解决方案的时候就讲过SequenceFile了。

它是一种二进制文件,内部数据是<Key,Value>的形式,也属于行存储。

它的特点是使用方便,MapReduce原生支持这种数据格式,并且它还支持切分,也支持压缩。

可以支持NONE、RECORD、BLOCK 级别的压缩。

NONE表示不压缩
RECORD表示行级别的压缩
Block表示块级别的压缩
1
2
3
4
5
由于Record是针对每一条数据分别进行压缩,压缩率比较低,所以一般都会选择Block压缩。

下面我们来演示一下具体的用法。

注意:为了避免在学习阶段导致Hive中参数混乱,建议每次验证不同数据存储格式的时候都重新开启一个新的Hive会话。

不使用压缩

1
2
3
4
5
6
7
8
9
10
11
12
先不使用压缩,创建一个SequenceFile存储格式的表。
注意:在这需要使用stored as指定sequencefile存储格式,不指定的话默认是textfile。所以我们前面在创建表的时候其实都省略了这个配置。

create external table stu_seqfile_none_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as sequencefile
location 'hdfs://bigdata01:9000/stu_seqfile_none_compress';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
确认一下这个表:
hive (default)> show create table stu_seqfile_none_compress;
OK
createtab_stmt
CREATE EXTERNAL TABLE `stu_seqfile_none_compress`(
`id` int,
`name` string,
`city` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'line.delim'='\n',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.SequenceFileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
LOCATION
'hdfs://bigdata01:9000/stu_seqfile_none_compress'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1819270622')
Time taken: 0.082 seconds, Fetched: 19 row(s)
1
2
3
4
5
6
7
8
9
10
11
12
如果INPUTFORMAT使用的是SequenceFileInputFormat,说明这个表中需要存储SequenceFile格式的数据。

然后从普通表中查询数据导入到这个SequenceFile存储格式的表中。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_seqfile_none_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_seqfile_none_compress
Found 1 items
-rw-r--r-- 2 root supergroup 2907234377 2027-08-26 11:25 /stu_seqfile_none_compress/000000_0

注意:这个文件在2.7G左右,比原始的TextFile文件还要大,原始的TextFile文件才2.09G。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
验证是否支持切分:
hive (default)> hive (default)> select id,count(*) from stu_seqfile_none_compress group by id;
Query ID = root_20270826113636_fb67ba8f-82d7-4672-9641-3b34b90282d1
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819247895426_0003, Tracking URL = http://bigdata01:8088/proxy/application_1819247895426_0003/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819247895426_0003
Hadoop job information for Stage-1: number of mappers: 11; number of reducers: 1

可以看到产生了11个map任务,说明SequenceFile格式的文件支持切分。

这个SequenceFile格式的文件中的数据是什么样子的,我们可以查看一下,之前我们在学习Hadoop的时候写过读取SequenceFile文件的代码,基于之前的代码,创建一个类:SeqRead
详细代码如下:

注意:这个SequenceFile文件中的key是空的,value是每一条数据的内容。
key对应的是BytesWritable类型。value是Text类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.compress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

/**
* Created by xuwei
*/
public class SeqRead {
public static void main(String[] args) throws Exception{
//创建一个配置对象
Configuration conf = new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS","hdfs://bigdata01:9000");
//seq文件路径
String inputFile = "/stu_seqfile_none_compress/000000_0";
//创建阅读器
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
BytesWritable key = new BytesWritable();
Text value = new Text();
//循环读取数据
while(reader.next(key,value)){
//输出文件名称
System.out.print("文件名:"+new String(key.getBytes())+",");
//输出文件内容
System.out.println("文件内容:"+value.toString()+"");
}
reader.close();
}
}
1
2
3
4
5
6
7
执行代码,可以看到这样的结果:
.......
文件名:,文件内容:1152574,zs152574,beijing152574
文件名:,文件内容:1152575,zs152575,beijing152575
文件名:,文件内容:1152576,zs152576,beijing152576
文件名:,文件内容:1152577,zs152577,beijing152577
.......
1
2
3
4
这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_seqfile_none_compress

现在这个表没有使用压缩,接下来我们创建一个带有压缩的表。

使用Deflate压缩(Block级别)

1
2
3
4
5
6
7
8
9
10
11
接下来使用Deflate压缩试验一下,基于Block压缩级别
首先创建表:
create external table stu_seqfile_deflate_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as sequencefile
location 'hdfs://bigdata01:9000/stu_seqfile_deflate_compress';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
指定使用deflate压缩,Block级别。
hive (default)> set hive.exec.compress.output=true;
hive (default)> set mapreduce.output.fileoutputformat.compress=true;
hive (default)> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec;
hive (default)> set io.seqfile.compression.type=BLOCK;

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_seqfile_deflate_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_seqfile_deflate_compress
Found 1 items
-rw-r--r-- 2 root supergroup 2568183982 2027-08-26 11:54 /stu_seqfile_deflate_compress/000000_0

压缩之后的数据小一些,2.39G。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
验证是否支持切分:
hive (default)> select id,count(*) from stu_seqfile_deflate_compress group by id;
Query ID = root_20270826115618_807f09e0-f520-4ed7-8ea5-69492d18ed5c
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819247895426_0006, Tracking URL = http://bigdata01:8088/proxy/application_1819247895426_0006/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819247895426_0006
Hadoop job information for Stage-1: number of mappers: 10; number of reducers: 1

可以看到产生了10个map任务,说明SequenceFile格式带压缩的文件也支持切分。
1
2
3
4
5
6
这个怎么理解呢?

因为SequenceFile这个文件自身是支持切分的,我们现在的压缩是针对文件内部的数据进行压缩,并不会改变SequenceFile的特性。这一点和TextFile就不一样了。

所以说在SequenceFile中使用压缩的时候就不需要考虑压缩格式自身是否支持切分的特性了,主要考虑的是压缩格式的压缩比、以及压缩解压速度这些指标。
看一下下面这个图可以帮助理解:

image-20230618224551905

1
2
3
上面表示是SequenceFile中的数据,每一个Record代表里面的一条数据。
当没有压缩的时候,Record内部存储的是Record的长度、key的长度、key和value。
当使用Record级别压缩的时候,Record内部存储的是Record的长度、key的长度、key和压缩之后的Value。

image-20230618224907971

1
2
3
4
5
6
当使用Block级别压缩的时候,SequenceFile中的数据是以Block为单位存储的,每个Block中存储多个Record,并且对Block内部的多个Record统一压缩存储。

所以这里的压缩是针对SequenceFile内部数据的压缩,并没有改变SequenceFile自身的特性,所以他依然是可以支持切分的。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_seqfile_deflate_compress

数据存储格式之RCFile

1
2
3
4
5
6
7
8
9
10
11
接下来我们来看一下RCFile这个数据存储格式。
RCFile是专门为Hive设计的数据存储格式。

数据会首先按照行分组,每个组内部按照列存储。
他整合了行存储和列存储的优点,可以称为是行列式存储,大层面还是属于列式存储的。

他的主要特点是压缩速度快,可分割,支持快速列存取。

在读取所有列的情况下,RCFile的性能没有SequenceFile高,不过在实际工作中大部分的统计分析需求都不会读取所有列。

接下来通过一个图来直观的感受一下RCFile的数据存储格式。

image-20230618225430788

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
左边的Relation表示关系型数据库中的数据存储形式,可以把它认为是一张表,表中有A\B\C\D这几个列。

第一行数据中 A的值为101,B的值为111,C的值为121,D的值为131。
后面的以此类推。

针对这份数据,如果使用RCFile格式进行存储是这样的,看中间这个图:
RCFile是存储在HDFS里面的,所以说会有多个HDFS的Block块。
每个Block块内部包含多个Row Group。RowGroup可以翻译为:行组。

RowGroup里面包含的是具体的数据,存储格式是这样的,看右边这个图:
这里面的第一行数据是所有A字段的值,因为RowGroup是行列式存储,所以会把原始表中的每个列的数据存储到一起,便于查询这个列的数据。

第二行是所有B字段的值。后面的以此类推。

所以从这个图里面可以看出来,RCFile会把数据首先按照行分成Row Group,在Row Group内部按照列存储,每个列的数据存储在一起。
1
2
3
下面我们来演示一下具体的用法。

注意:为了避免在学习阶段到导致Hive中参数混乱,建议每次验证不同数据存储格式的时候都重新开启一个新的Hive会话。

不使用压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
先不使用压缩,创建一个RCFile存储格式的表。

注意:这里需要在stored as后面指定rcfile
create external table stu_rcfile_none_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as rcfile
location 'hdfs://bigdata01:9000/stu_rcfile_none_compress';

确认一下这个表:
hive (default)> show create table stu_rcfile_none_compress;
OK
createtab_stmt
CREATE EXTERNAL TABLE `stu_rcfile_none_compress`(
`id` int,
`name` string,
`city` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'line.delim'='\n',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
'hdfs://bigdata01:9000/stu_rcfile_none_compress'
TBLPROPERTIES (
'bucketing_version'='2',
'transient_lastDdlTime'='1819272229')
Time taken: 0.085 seconds, Fetched: 19 row(s)

如果INPUTFORMAT使用的是RCFileInputFormat,说明这个表中需要存储RCFile格式的数据。
1
2
3
4
5
6
7
8
9
10
然后从普通表中查询数据导入到这个RCFile存储格式的表中。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_rcfile_none_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_rcfile_none_compress
Found 1 items
-rw-r--r-- 2 root supergroup 1771023524 2027-08-26 16:41 /stu_rcfile_none_compress/000000_0

这个结果文件大小为1.6G,咱们前面使用SequenceFile时文件大小达到了2.7G,原始的TextFile格式大小为2.09G,这样可以说明RCFile在数据存储层面还是比较优秀的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
验证是否支持切分:
hive (default)> select id,count(*) from stu_rcfile_none_compress group by id;
Query ID = root_20270826164653_f1c9e0bc-3b75-4ad3-aec2-ce110acaf45b
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819268458021_0003, Tracking URL = http://bigdata01:8088/proxy/application_1819268458021_0003/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819268458021_0003
Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 1
1
2
3
4
可以看到产生了7个map任务,说明RCFile格式的文件是支持切分的。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_rcfile_none_compress

使用Deflate压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
接下来使用Deflate压缩格式试验一下。
首先创建表:
create external table stu_rcfile_deflate_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as rcfile
location 'hdfs://bigdata01:9000/stu_rcfile_deflate_compress';

指定使用deflate压缩。
hive (default)> set hive.exec.compress.output=true;
hive (default)> set mapreduce.output.fileoutputformat.compress=true;
hive (default)>set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DeflateCodec;

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_rcfile_deflate_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_rcfile_deflate_compress
Found 1 items
-rw-r--r-- 2 root supergroup 401071415 2027-08-26 17:23 /stu_rcfile_deflate_compress/000000_0

此时发现结果数据文件只有382M,并且现在用的压缩格式还是不是最优的。
和SequenceFile压缩后的文件大小2.39G相比,RCFile文件的存储效果很明显。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_rcfile_deflate_compress group by id;
Query ID = root_20270826181418_c5b0b9d0-a0ed-4c18-9f1b-8fac75de7169
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819268458021_0011, Tracking URL = http://bigdata01:8088/proxy/application_1819268458021_0011/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819268458021_0011
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1

可以看到产生了2个map任务,说明RCFile格式带压缩的文件也支持切分。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_rcfile_deflate_compress

数据存储格式之ORC

1
2
3
4
下面我们来看一下ORC这个数据存储格式。
ORC是在RCfile的基础之上做了一些升级,在性能层面有大幅度提升。

ORC的存储格式是这样的:

image-20230618230530692

1
2
3
4
5
6
7
8
9
10
这个图来源于官网,大致可以看出来ORC中的数据首先会被划分为多个Stripe,每个Stripe 250M。Stripe表示ORC文件存储数据的地方。
在Stripe内部包含了Index Data,Row Data和Stripe Footer。

-Index Data这里面存储的是索引数据。
-Row Data里面存储的是具体的数据,这里面有多个行组,每10000行构成一个行组。
-Stripe Footer:里面存储的是数据所在的文件目录。

最下面的File Footer:里面包含了ORC文件中Stripe的列表、每个Stripe的行数,以及每个列的数据类型。他还包含了每个列的最小值、最大值等信息。

postscript:里面包含了压缩参数和压缩大小相关的信息。
1
这个图看起来稍微有点复杂,简化一下是这样的:

image-20230618230822325

1
2
3
4
5
6
7
从这个图里面可以看出来ORC存储格式和RCFile存储格式在存储形式上没有特别大的区别,核心思想还是行列式存储。

在创建ORC数据存储格式表的时候,所有关于ORC的参数都在建表语句中的tblproperties中指定,不需要在hive的命令行中使用set命令指定了。
create table t1(
id int,
name string
) stored as orc tblproperties("orc.compress"="NONE")
1
2
3
4
注意:tblproperties中可以指定多个参数,多个参数之间使用逗号分割即可。

ORC支持的参数主要包括这些:
在这里大家主要关注orc.compress这个参数即可。

image-20230618231020219

1
2
3
4
5
6
7
8
9
-orc.compress:表示ORC文件的压缩类型,支持NONE、ZLIB和SNAPPY,默认值是ZLIB。这里的ZLIB其实就和前面我们所说的Deflate压缩格式是类似的,因为Deflate压缩格式底层就是用的ZLIB。

-orc.compress.size:表示每个压缩块的大小,默认是256KB。这里面显示的是字节。

-orc.stripe.size:stripe是ORC文件中存储数据的单位。这个配置表示在生成stripe的时候可以使用的内存缓冲池大小,默认值是64M。这里面显示的是字节。
-orc.row.index.stride:行组级别索引的数据量大小,默认是10000,必须要设置为大于等于10000的数值。表示每10,000行数据,建一次索引。
-orc.create.index:是否创建行组级别索引,默认是true。
-orc.bloom.filter.columns:表示针对哪些列创建布隆过滤器,默认不创建。
-orc.bloom.filter.fpp:通过布隆过滤器可以通过较少的文件空间快速判断数据是否存在表中,但是也会存在将不属于这个表的数据判定为属于这个表的情况,这其实就是布隆过滤器的特性,这个特性可以称之为假正概率,我们可以调整这个概率。设置的概率越低,出现的误差就越小,但是布隆过滤器所需要的空间就越大。这个概率的取值范围在0~1之间,大于0,小于1。
1
2
注意:从官网文档资料上来看,ORC只支持NONE、ZLIB、SNAPPY这三种压缩格式。默认是ZLIB,到这可以确认一下。
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC

image-20230618231442497

1
2
3
4
5
其实目前ORC还支持其他压缩格式,这个等一会我们来验证一下。

下面我们来演示一下具体的用法。

注意:为了避免在学习阶段导致Hive中参数混乱,建议每次验证不同数据存储格式的时候都重新开启一个新的Hive会话。

不使用压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
先不使用压缩,创建一个ORC存储格式的表。

注意:这里需要在stored as后面指定orc,并且在tblproperties中指定"orc.compress"="NONE"。

针对ORC存储格式的表在控制压缩格式的时候是通过orc.compress指定的,不需要在hive中设置hive.exec.compress.output参数了。
create external table stu_orc_none_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
location 'hdfs://bigdata01:9000/stu_orc_none_compress'
tblproperties("orc.compress"="NONE");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
确认一下这个表:
hive (default)> show create table stu_orc_none_compress;
OK
createtab_stmt
CREATE EXTERNAL TABLE `stu_orc_none_compress`(
`id` int,
`name` string,
`city` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'field.delim'=',',
'line.delim'='\n',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://bigdata01:9000/stu_orc_none_compress'
TBLPROPERTIES (
'bucketing_version'='2',
'orc.compress'='NONE',
'transient_lastDdlTime'='1819333874')
Time taken: 0.146 seconds, Fetched: 20 row(s)

如果INPUTFORMAT使用的是OrcInputFormat,说明这个表中需要存储ORC格式的数据。
1
2
3
4
5
6
7
8
9
10
然后从普通表中查询数据导入到这个RCFile存储格式的表中。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_orc_none_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_orc_none_compress
Found 1 items
-rw-r--r-- 2 root supergroup 1480509363 2027-08-27 10:50 /stu_orc_none_compress/000000_0

这个结果文件大小为1.38G,比RCFile 的1.6G还要小一些,说明ORC确实是基于RCFile做了升级优化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_orc_none_compress group by id;
Query ID = root_20270827105245_3b71317c-504b-4ee9-918b-d3519bba561f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819331557204_0004, Tracking URL = http://bigdata01:8088/proxy/application_1819331557204_0004/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819331557204_0004
Hadoop job information for Stage-1: number of mappers: 6; number of reducers: 1

可以看到产生了6个map任务,说明ORC格式的文件是支持切分的。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_orc_none_compress

使用Zlib压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
接下来使用Zlib压缩格式试验一下。
首先创建表,指定ZLIB压缩格式。
create external table stu_orc_zlib_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
location 'hdfs://bigdata01:9000/stu_orc_zlib_compress'
tblproperties("orc.compress"="ZLIB");

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_orc_zlib_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_orc_zlib_compress
Found 1 items
-rw-r--r-- 2 root supergroup 282168781 2027-08-27 11:03 /stu_orc_zlib_compress/000000_0

此时发现结果数据文件只有269 M。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_orc_zlib_compress group by id;
Query ID = root_20270827110706_4557ff5c-a830-4adb-9325-653743b9d24e
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819331557204_0007, Tracking URL = http://bigdata01:8088/proxy/application_1819331557204_0007/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819331557204_0007
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1

可以看到产生了2个map任务,说明ORC格式带压缩的文件也支持切分。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_orc_zlib_compress

使用Snappy压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
接下来使用Snappy压缩格式试验一下。
首先创建表,指定SNAPPY压缩格式。
create external table stu_orc_snappy_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
location 'hdfs://bigdata01:9000/stu_orc_snappy_compress'
tblproperties("orc.compress"="SNAPPY");

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_orc_zlib_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_orc_snappy_compress
Found 1 items
-rw-r--r-- 2 root supergroup 555197693 2027-08-27 11:26 /stu_orc_snappy_compress/000000_0

此时发现结果数据文件有529M。比Zlib的压缩性能差一些,因为Zlib的压缩比确实比Snappy高,Snappy最主要的优点是压缩和解压速度快。

在实际工作中,针对ORC格式而言,最常见的是使用Snappy压缩,主要看中他的压缩和解压速度快。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_orc_snappy_compress group by id;
Query ID = root_20270827112919_ec9cc867-5cb0-4f97-a275-21c66d761a47
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819331557204_0012, Tracking URL = http://bigdata01:8088/proxy/application_1819331557204_0012/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819331557204_0012
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1

可以看到产生了3个map任务,说明ORC+Snappy压缩的文件也支持切分。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_orc_snappy_compress

验证是否支持其他压缩格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
首先验证一下是否支持bzip2
创建表:
create external table stu_orc_bzip2_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as orc
location 'hdfs://bigdata01:9000/stu_orc_bzip2_compress'
tblproperties("orc.compress"="BZIP2");

向表中导入数据。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_orc_bzip2_compress select id,name,city from stu_textfile group by id,name,city;

结果发现最后执行报错了,核心报错信息如下:
Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.orc.CompressionKind.BZIP2
at java.lang.Enum.valueOf(Enum.java:238)
at org.apache.orc.CompressionKind.valueOf(CompressionKind.java:25)
at org.apache.orc.OrcFile$WriterOptions.<init>(OrcFile.java:426)
at org.apache.hadoop.hive.ql.io.orc.OrcFile$WriterOptions.<init>(OrcFile.java:112)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.writerOptions(OrcFile.java:317)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:126)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:184)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:61)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:297)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:282)
... 18 more
1
2
3
4
5
这个错误信息说的是不支持Bzip2,这个错误是在CompressionKind这个类中抛出来的,那也就意味着在这个类里面会对压缩格式进行判断。

那我们来找一下这个类。

到github上搜hive这个项目,选择apache/hive

image-20230618232624402

1
然后选择hive-3.1.2这个分支的代码,因为我们现在使用的是这个版本。

image-20230618232703855

1
在这个项目中搜索CompressionKind这个类即可。

image-20230618232730345

image-20230618232741967

image-20230618232816718

1
所以说现在,ORC主要支持这四种压缩格式,这就说明Hive的文档不是最新的了。还是要以源码为主。

数据存储格式之PARQUET

1
2
3
4
5
6
下面我们来看一下parquet这个数据存储格式。
Parquet是一种新型的与语言无关的,并且不和任何一种数据处理框架绑定的列式存储结构,适配多种语言和组件。

Parquet数据存储格式可以在Hive、Impala、Spark等计算引擎中使用。

Parquet的存储格式是这样的:

image-20230618232928394

1
2
3
4
5
6
7
这个图是来源图官网的,看起来有点乱。
但是大致可以看出来Parquet格式的文件中包含了多个Row Group。
每个Row Group内部包含了多个Column。
Column内部包含了多个Page。
Page内部存储了具体的数据内容。

下面来看一个简化版的,这样看起来会清晰一些。

image-20230618233130578

1
2
3
4
5
Parquet存储格式比ORC更复杂一些,多封装出来一层Page。ORC存储格式是把数据内容直接存储到了Column这一层。

Parquet具体都支持哪些压缩格式呢?

在hive的官网上找不到,需要到github上找到parquet-mr这个项目。

image-20230618233256143

1
再找到里面的parquet-hadoop子项目

image-20230618233314259

1
2
3
4
5
查看这个项目中的README.md文件中的内容

下面来演示一下。

注意:为了避免在学习阶段导致Hive中参数混乱,建议每次验证不同数据存储格式的时候都重新开启一个新的Hive会话。

image-20230618233345219

不使用压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
先不使用压缩,创建一个Parquet存储格式的表。
create external table stu_parquet_none_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as parquet
location 'hdfs://bigdata01:9000/stu_parquet_none_compress'
tblproperties("parquet.compression"="uncompressed");
注意:这里需要在stored as后面指定parquet,并且在tblproperties中指定"parquet.compression"="uncompressed"。

确认一下这个表:
hive (default)> show create table stu_parquet_none_compress;
OK
createtab_stmt
CREATE EXTERNAL TABLE `stu_parquet_none_compress`(
`id` int,
`name` string,
`city` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'field.delim'=',',
'line.delim'='\n',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://bigdata01:9000/stu_parquet_none_compress'
TBLPROPERTIES (
'bucketing_version'='2',
'parquet.compression'='uncompressed',
'transient_lastDdlTime'='1819354458')
Time taken: 0.051 seconds, Fetched: 20 row(s)

如果INPUTFORMAT使用的是MapredParquetInputFormat,说明这个表中需要存储Parquet格式的数据。

然后从普通表中查询数据导入到这个Parquet存储格式的表中。
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_parquet_none_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_parquet_none_compress
Found 1 items
-rw-r--r-- 2 root supergroup 2197921560 2027-08-27 16:13 /stu_parquet_none_compress/000000_0

这个结果文件大小为2.05G,比ORC的1.38G要大,所以ORC在存储层面还是占有一定优势的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_parquet_none_compress group by id;
Query ID = root_20270827162003_c66d703c-8662-47cd-8526-abda283ff866
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819348656866_0010, Tracking URL = http://bigdata01:8088/proxy/application_1819348656866_0010/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819348656866_0010
Hadoop job information for Stage-1: number of mappers: 9; number of reducers: 1

可以看到产生了9个map任务,说明Parquet格式的文件是支持切分的。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_parquet_none_compress

使用Gzip压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
接下来使用Gzip压缩格式试验一下。
首先创建表,指定gzip压缩格式。
create external table stu_parquet_gzip_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as parquet
location 'hdfs://bigdata01:9000/stu_parquet_gzip_compress'
tblproperties("parquet.compression"="gzip");

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_parquet_gzip_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_parquet_gzip_compress
Found 1 items
-rw-r--r-- 2 root supergroup 376028689 2027-08-27 16:35 /stu_parquet_gzip_compress/000000_0

此时发现结果数据文件大致在358 M。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
验证是否支持切分:
hive (default)> select id,count(*) from stu_parquet_gzip_compress group by id;
Query ID = root_20270827164309_1f7f866c-b1fa-4e20-bdc9-791d666a2bb0
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819348656866_0013, Tracking URL = http://bigdata01:8088/proxy/application_1819348656866_0013/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819348656866_0013
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1

可以看到产生了2个map任务,说明Parquet格式带压缩的文件也支持切分。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。
[root@bigdata04 ~]# hdfs dfs -rm -r -skipTrash /stu_parquet_gzip_compress

使用Snappy压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
接下来使用Snappy压缩格式试验一下。
首先创建表,指定snappy压缩格式。
create external table stu_parquet_snappy_compress(
id int,
name string,
city string
)row format delimited
fields terminated by ','
lines terminated by '\n'
stored as parquet
location 'hdfs://bigdata01:9000/stu_parquet_snappy_compress'
tblproperties("parquet.compression"="snappy");

向表中导入数据:
hive (default)> set mapreduce.job.reduces=1;
hive (default)> insert into stu_parquet_snappy_compress select id,name,city from stu_textfile group by id,name,city;

产生的结果数据是这样的:
[root@bigdata04 ~]# hdfs dfs -ls /stu_parquet_snappy_compress
Found 1 items
-rw-r--r-- 2 root supergroup 843081901 2027-08-27 16:51 /stu_parquet_snappy_compress/000000_0

此时发现结果数据文件有804M,比Gzip压缩差一些,但是胜在压缩和解压速度快。

验证是否支持切分:
hive (default)> select id,count(*) from stu_parquet_snappy_compress group by id;
Query ID = root_20270827165249_233363a1-56a9-41fc-8f01-6e0716548458
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1819348656866_0016, Tracking URL = http://bigdata01:8088/proxy/application_1819348656866_0016/
Kill Command = /data/soft/hadoop-3.2.0/bin/mapred job -kill job_1819348656866_0016
Hadoop job information for Stage-1: number of mappers: 4; number of reducers: 1

可以看到产生了4个map任务,说明Parquet+Snappy压缩的文件也支持切分。

这个表中的数据验证完毕之后,建议删除一下数据,释放HDFS空间。

数据存储格式总结

数据存储格式之存储空间汇总

1
2
3
前面我们分析了这么多种数据存储格式,具体在工作中应该如何选择呢?

看一下这个表格:

image-20230618234001096

1
2
3
4
原始文本文件大小为:2.09G。

从这个表格中可以看出来,ORC格式在数据存储层面性能最优。
所以在实际工作中,如果希望减少Hive中的数据存储量,建议使用ORC数据存储格式。

数据存储格式之压缩格式选择

1
2
3
4
5
6
在使用ORC数据存储格式的时候,最好再配合上合适的压缩格式,这样可以进一步挖掘ORC的存储性能。

但是ORC自身存储能力已经非常好了,并且也支持切分。所以在选择压缩格式时只需要重点考虑压缩和解压速度就行了。

常见的压缩格式中,Lzo、Snappy的压缩和解压速度是最快的。
针对他们两个进行对比的话,看下面这个表格。

image-20230618234150257

1
2
3
4
5
6
7
从这里可以看出来,Snappy的压缩和解压速度相对更好一些。

注意:这个表格中的数据需要在生产环境下进行测试,虚拟机下测试的不太准确,但是大致规律是一样的。

所以最终建议,在工作中选择ORC+Snappy。

但是有时候在选择数据存储格式的时候还需要考虑它的兼容度,是否支持多种计算引擎。
1
2
3
4
5
6
7
8
假设我希望这一份数据,既可以在Hive中使用,还希望在Impala中使用,这个时候就需要重点考虑这个数据格式是否能满足多平台同时使用。

Impala很早就开始支持Parquet数据格式了,但是不支持ORC数据格式,不过从Impala 3.x版本开始也支持ORC了。
如果你们使用的是Impala2.x版本,还想要支持Hive和Impala查询同一份数据,这个时候就需要选择Parquet了。

相对来说,Parquet存储格式在大数据生态圈中的兼容度是最高的。

Parquet官网上有一句话,说的是Parquet可以应用于Hadoop生态圈中的任何项目中。

本文标题:大数据开发工程师-第八周 第6章 Hive技巧与核心复盘

文章作者:TTYONG

发布时间:2022年02月20日 - 12:02

最后更新:2023年06月21日 - 15:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%85%AB%E5%91%A8-%E7%AC%AC6%E7%AB%A0-Hive%E6%8A%80%E5%B7%A7%E4%B8%8E%E6%A0%B8%E5%BF%83%E5%A4%8D%E7%9B%98.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%