大数据开发工程师-快速上手NoSQL数据库HBase-2


快速上手NoSQL数据库HBase-2

3 深入HBase架构原理

Region概念解释

1
2
3
4
5
6
Region可以翻译为区域,在HBase里面,一个表中的数据,会按照行被横向划分为多个Region。
每个Region,按照存储的Rowkey的最小行键和最大行键指定的,使用区间[start Rowkey,end Rowkey)

解释:
-如果一个文件中数据量很大的时候,从这个大文件中读取数据肯定会比较慢
-打开一个小文件查找数据和打开一个大文件查找数据的效率是不一样的
image-20230621234751043
1
2
3
4
5
6
7
8
在这个图里面,表t1刚创建的时候默认只有1个Region,后来数据量多了以后,Region会自动分裂,这样就产生了多个Region。
那么我们在查询数据的时候,首先要知道数据在哪个Region中,再从Region中读取数据。

如何知道数据在哪个Region中呢?
我们在向表中插入数据的时候,Rowkey是必须指定,不能缺少的,并且Rowkey在存储的时候是有序存储的。
那么我们在定位数据的时候,就可以拿Rowkey到对应的Region中进行对比,每个Region中都会有一个最小Rowkey和最大Rowkey,这样就能很快的判断出来我们要找的数据是不是在这个Region中了。

其实每个Region中的最大Rowkey是有一个地方进行维护的,HBase内部默认提供了一个目录表来维护这个关系。
1
2
如果表t1有两个列族c1和c2,那么在存储的时候,列族c1中的数据是一个独立的文件,列族c2中的数据也会是一个独立的文件,也就是说,每一个列族中的数据在底层存储的时候都是一个单独的文件。
如下图所示,当表有多个Region的时候,每个Region内部的每个列族是一个单独的文件。
image-20230621235252969
1
2
3
4
5
6
7
8
9
所以说我们在设计列族的时候,可以把经常读取的列存储到一个列族中,不经常读取的列放到另一个列族中。
这样我们在读取部分列的数据的时候,就只需要读取对应列族文件中的数据,提高读取效率。

在这里有几个问题:

-如果一个列族中如果有2个列,那么这2个列会存储到2个列族文件中吗?不会的。
-一行记录,会不会分到多个文件中存储? 会
-一个列族中的数据,会不会在多个Region中存储?会
-一个Region中,会不会存储多个列族文件?会

HBase物理架构

image-20230621235550517
1
2
3
4
5
6
主要包含以下内容:

Zookeeper:为HBase集群提供基础服务。
HBase Master:HBase集群的主节点。
HBase Regionserver:HBase集群的从节点。
Client:客户端节点。

Zookeeper

1
2
3
4
5
ZooKeeper为HBase集群提供协调服务,它管理着HMaster和HRegionServer的状态(available/alive等),并且会在HRegionServer宕机时通知给HMaster。

ZooKeeper协调HBase集群所有节点的共享信息,在HMaster和HRegionServer连接到ZooKeeper后会创建Ephemeral(临时)节点,并使用心跳机制监控这个节点的存活状态,如果某个临时节点失效,则HMaster会收到通知,并做相应的处理,这块需要通过Watcher监视器实现。

另外,HMaster通过监听ZooKeeper中的临时节点(默认:/hbase/rs/*)来监控HRegionServer的加入和宕机。在第一个HMaster连接到ZooKeeper时会创建临时节点(默认:/hbasae/master)来表示Active的HMaster,后面加入进来的HMaster则监听该临时节点,如果当前Active的HMaster宕机,则该临时节点消失,因此其他HMaster会得到通知,然后将自身转换成Active的HMaster,在变为Active的HMaster之前,它会先在/hbase/back-masters/下创建自己的临时节点。
1
2
3
4
5
对Zookeeper的作用总结一下,一共有3点:

-Zookeeper维护HBase集群的信息
-HRegionserver启动的时候会在Zookeeper的/hbase/rs下面创建节点信息
-HMaster会在Zookeeper的/hbase下创建master节点 多余的HMaster会监听这个节点,发现这个节点失效的时候,会接管这个角色。
1
2
3
4
5
6
7
8
登录Zookeeper,查看HBase的一些节点信息,HBase默认会在Zookeeper的根节点下面创建hbase节点
[zk: localhost:2181(CONNECTED) 0] ls /hbase
[backup-masters, draining, flush-table-proc, hbaseid, master, master-maintenance, meta-region-server, namespace, online-snapshot, rs, running, splitWAL, switch, table]

在Zookeeper中查看HBase的从节点信息,这里面显示的内容表示目前HBase集群有2个从节点bigdata02和bigdata03

[zk: localhost:2181(CONNECTED) 1] ls /hbase/rs
[bigdata02,16020,1777443334673, bigdata03,16020,1777443333539]

HMaster

1
2
3
4
5
6
7
8
HMaster是HBase集群的主节点,HBase集群支持多个HMaster节点,可以实现HA。
通过ZooKeeper的选举机制保证同时只有一个HMaster处于Active状态,其他的HMaster处于备份状态。一般情况下会启动两个HMaster,备份状态的HMaster会定期和Active HMaster通信以获取其最新状态,从而保证它是实时更新的,因此如果启动了多个HMaster反而增加了Active HMaster的负担。

HMaster主要有以下职责:
-管理HRegionServer,实现其负载均衡。
-管理和分配Region,比如在Region分裂时分配新的Region;在HRegionServer退出时迁移里面的Region到其他HRegionServer上。
-管理namespace和table的元数据(这些元数据实际存储在HDFS上面)
-权限控制(ACL)

HRegionserver

1
2
3
HRegionserver是HBase集群的从节点。

HRegionServer一般建议和DataNode部署在同一台机器上,这样可以实现数据的本地化特性,因为DataNode是存储数据的,HBase的数据也是存储在HDFS上的,HRegionServer就是管理数据的,所以这样的话可以尽量保证HRegionServer读取本地的数据,只有磁盘IO,节省了网络IO。

HBase架构详解

image-20230622000928203

1
2
3
4
5
图中虚线下面是HDFS部分 上面是HBase的部分
Client(客户端)想要连接HBase的时候,需要先连接Zookeeper。
首先会找Zookeeper中/hbase/meta-region-server这个节点,这个节点里面保存了HBase中meta表的数据(Region)所在的Regionserve节点信息。
在Zookeeper执行get命令查看节点上的信息,显示出来的有乱码,不过大致可以看出来,里面显示的是bigdata03,也就意味着meta表的Region是在bigdata03这个节点上的。
注意:大家在查看的时候显示的不一定是bigdata03节点,也可能是其他的节点。
1
2
3
4
[zk: localhost:2181(CONNECTED) 3] get /hbase/meta-region-server
�master:16000)��K,}�PBUF

bigdata03_x0010_�}�ªڵ4_x0010_
1
2
我们也可以通过HBase的UI界面进行查看验证。
先在系统表中找到这个meta表

image-20230622001256147

1
然后点击进去,显示的这个表确实是在bigdata03这个节点上面

image-20230622001323492

1
我们可以在这里点击bigdata03:16030这个链接,然后进入到这个界面

image-20230622001458992

1
2
3
4
5
6
7
8
9
10
最终会发现meta表的数据确实就是在bigdata03这个节点上面的。

客户端到Zookeeper中找到这个信息以后,就把这个信息加载到缓存中了,这样就不用每次都去重新加载了

meta表中存储了所有表的相关信息,可能会有很多,都加载到内存的话可能会扛不住的,所以客户端并不会加载meta表中的所有数据,只会把meta表中我们目前需要的相关数据加载到内存。

meta表里面存储的有HBase中所有表对应的RegionServer节点信息

scan ‘hbase:meta’ 可以看到这个meta表里面的详细信息。
所以客户端这个时候其实就可以获取到表对应RegionServer的IP和端口信息了,通过RPC机制就可以通信了。

详解HRegionServer

1
下面就来详细看一下HRegionServer里面的内容
image-20230622001912173
1
2
3
4
5
6
HRegionServer里面有2块内容,一个是HLog ,另一个是HRegion(其实就是我们前面分析的Region,是同一个意思,Region是HRegion的简称)。

在一个HRegionServer里面,HLog只有一个,HRegion会有多个,这个框后面是有三个点,表示是多个的意思。
HLog是负责记录日志的,针对这个HRegionServer中的所有写操作,包括put、delete等操作,只要是会对数据产生变化的操作,都会记录到这个日志中,再把数据写到对应的HRegion中。

HRegion就是负责存储实际数据了。
1
看一下HRegion内部:

image-20230622002106137

1
2
3
4
5
6
7
每一个Store对应一个列族,所以一个HRegion里面可能会有多个Store。
向HRegionserver中写数据的时候,会先写HLog,然后在把数据写入HRegion的时候,会根据指定的列族信息写入到不同的Store里面,我们之前在向表中put数据的时候,表名和列族名称都是必须要指定的。

Store里面包含两部分:MemStore和StoreFile。
用户写入的数据首先会放入MemStore【基于内存的Store】里面,当这个MemStore写满了以后,会把数据持久化到StoreFile中,每一次内存满了持久化的时候都会生成一个StoreFile,StoreFile底层对应的是一个HFile文件。
HFile文件会通过下面的DFS Client写入到HDFS中。
最终HLog和HFile都是用DFS Client写入到HDFS中的。

HBase物理存储模型

1
2
HBase中表的数据是存储在Region中的,表中的数据会越来越多,Region就会分裂,分裂出来的多个Region会分布到多个节点上面,因为单台机器的存储能力是有限的,这样对后期数据并行读取也有好处,有利于扩展。
这样就可以保证一个表能存储海量数据,存放Region的服务器称之为Region Server。
image-20230622002810617

WAL(Write-Ahead Logging)预写日志系统

1
2
3
4
WAL最重要的作用是灾难恢复。和MySQL的Binlog类似,它记录所有的数据改动。一旦服务器崩溃,通过重放log可以恢复崩溃之前的数据。这也就意味如果写入WAL失败,整个写入操作将认为失败。
HBase中,HLog是WAL的实现类。一个HRegionServer对应一个HLog实例。

WAL数据是存储在HDFS上面的,点进去可以看到是一个一个的文件。
image-20230622003106817

HFile介绍

1
2
3
4
HFile是HBase中重要的一个存在,可以说是HBase架构中最小的结构,HBase的底层数据都在HFile中。
HFile从根本上来说是HDFS中的文件,只是它有自己特殊的格式。

HFile文件由6部分组成:

image-20230622003528698

1
2
3
4
5
6
Data(数据块): 保存表中的数据(key-value的形式),这部分可以被压缩,每个数据块都有一个Magic头,负责存储偏移量和第一个Key。
Meta(元数据块):存储用户自定义的key-value。
File Info:定长,记录了文件的一些元信息,例如:AVG_KEY_LEN,AVG_VALUE_LEN,LAST_KEY等
Data Index(数据块索引):记录了每个数据块(Data)的起始索引。
Meta Index(元数据块索引):记录了每个元数据块(Meta)的起始索引。
Trailer:定长,用于指向其他数据块的起始点。

从读/写流程角度理解Hbase架构

写流程

image-20230624100622100

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
总结下来:
--1. 具体流程:
1. Client先访问zookeeper,请求hbase:meta元数据位于哪个Region Server中。
2. zookeeper返回hbase:meta所在的Region Server地址
3. client访问对应的Region Server,请求hbase:meta元数据信息
4. Region Server返回meta元数据信息
5. client根据写请求的namespace:table/rowkey,查询出目标数据位于哪个Region Server中的哪个Region中。
并将该table的region信息以及meta表的位置信息缓存在客户端的meta cache,方便下次访问。
6. 与目标Region Server进行通讯;
7. 将数据顺序写入(追加)到WAL;
8. WAL将数据写入对应的MemStore,数据会在MemStore进行排序;
9. 完成写数据操作以后,向客户端发送ack;
10. 等达到MemStore的刷写时机后,将数据刷写到HFile。

-- 2. hbase:meta包含哪些信息呢?
命令:scan ‘hbase:meta’
a、rowkey: test,,1592911245995.c42a3b247c7ed78f071
1) test:namespace,命名空间
2) ,,:startkey endkey,起止的rowkey
3) 1592911245995:time stamp ,时间戳
4) .c42a3b247c7ed78f071:前面3个参数的MD5值。
b、column=info:regioninfo : region的信息,
value={ENCODED => c42a3b247c7ed78f071f60721bad78ad, NAME => 'test,,
f60721bad78ad.1592911245995.c42a3b247c7ed78f071f60721bad78ad.', STARTKEY => '', ENDKEY => ''}
c、column=info:seqnumDuringOpen :序列号,value=\x00\x00\x00\x00\x00\x00\x00\x02
d、column=info:server : 该region所在的server,value=hadoop106:16020
e、column=info:serverstartcode : 该region所在的server创建的时间戳,value=1592887276902
f、column=info:sn : value=hadoop106,16020,1592887276902
g、column=info:state :该region的状态,value=OPEN

读流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Hbase的读比写慢,因为读的时候,一定要扫描磁盘.

思考:读数据
读数据首先的找到Region所对应的RegionServer.找RegionServer就得找Meta表->找Zookeeper

思考因为1.HDFS不支持随机写和2.Hbase写入数据时候,可以指定时间戳的原因,导致HBase中存放数据的位置有很多,那我们读哪些位置呢?
答:全部都读.

全部都读的话,同一个RowKey会有很多版本,怎么呢?
答:merge合并

还真么多读的位置,还要merge,HBase的读取效率的多慢啊?
HBase肯定做了优化啊.做了哪些优化呢?
1.Block Cache
2.读HDFS时.会根据时间范围,RowKey范围,以及布隆过滤器进行优化.

读的位置有很多:
1.并不是先读内存,在读磁盘.因为hbase写入数据时候,可以指定时间戳.
例如:先写入数据A,已经被flush到磁盘.后写入数据B并指定时间戳小. 假如先读内存,导致读出来B====>错误数据
2.因此读MemStore,Hfile(Block cache没扫描过的Hfile),Block cache.3个都读
3.HDFS时.有很多优化.被读的HFile,都是Block Cache没缓存的数据.

BloomFilter布隆过滤器

1
2
3
4
5
6
7
8
9
布隆过滤器是一种比较巧妙的概率型数据结构,可以用来告诉你 “某样东西一定不存在或者可能存在”。
也就是说它告诉你某样东西不存在的话就一定不存在。
如果它告诉你某样东西存在,也可能实际是不存在的。

布隆过滤器是hbase中的高级功能,它能够减少特定访问模式(get/scan)下的查询时间。进而提高HBase集群的吞吐率。

当我们随机查询数据时,如果采用HBase的块索引机制,HBase会加载很多块文件。如果采用布隆过滤器后,它能够准确判断该HFile的所有数据块中,是否含有我们查询的数据,从而大大减少不必要的块加载,进而提高HBase集群的吞吐率。

对于HBase而言,当我们选择采用布隆过滤器之后,HBase会在生成HFile时包含一份布隆过滤器结构的数据,开启布隆过滤器会有一定的存储及内存开销。但是在大多数情况下,这些负担相对于布隆过滤器带来的好处来说是可以接受的。

image-20230624101548341

image-20230624101614330

1
2
3
4
5
6
7
8
9
优化:
1.Hfile存储的是一定时间范围.一定RoWkey范围的.所以读数据时候会根据RowKey/时间范围过滤,通过时间范围/rowkey范围不在里边的storefile不会被扫描.
2.布隆过滤器:告诉Hbase,这个文件中没有你想要的数据.怎么实现的呢?
布隆过滤器:只能告诉你这个文件中不存在你想要的数据,不能告诉你是否存在.
数据在往文件里边存储时.会经过几次算法计算.将维护的数组位置表示为1;
读数据时,就不需要扫描一遍全文件.只需要在经过相同算法计算,看对应数组为位置是否全部为1.
如果全部为1==>可能存在这个数据 否则:一定不存在
(MemetoreHfile被刷写前,会进行排序.所以,RowKey有序,但是并不一定连续)
4.merge:返回版本大的数据

HFile compaction(合并)机制

1
2
3
4
5
6
7
8
9
10
11
12
当MemStore超过阀值的时候,就会持久化生成一个(StoreFile)HFile。
因此随着数据不断写入,HFile的数量将会越来越多,HFile数量过多会降低读性能,因为每次查询数据都需要加载多个HFile文件。为了避免对读性能的影响,可以对这些HFile进行合并操作,把多个HFile合并成一个HFile。

合并操作需要对HBase中的数据进行多次的重新读写,这个过程会产生大量的IO。因此可以发现合并机制的本质就是以IO操作换取后续读性能的提高。

合并操作分为major(大合并)和minor(小合并)两种。

minor(小合并):只做部分文件的合并操作,生成新文件设置成激活状态,然后删除老的HFile文件(标记为删除状态,在major合并时删除)。小合并的过程一般较快,而且IO相对较低。

major(大合并):对Region下(同列族)的所有HFile执行合并操作,最终的结果是合并出一个HFile文件。在生成新的HFile时,会忽略掉已经标记为删除的数据、ttl过期的数据、版本超过限定的数据。大合并会产生大量的IO操作,对HBase的读写性能产生较大影响。

注意:一般情况下,大合并会持续很长时间,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会关闭自动触发大合并功能,改为手动在业务低峰期触发。

Region Split(分裂)机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
前面我们分析了HFile文件的合并机制,当HFile文件合并多次之后,会导致Region中的数据过大,此时就需要涉及Region的分裂机制了。
当HBase中的一个表刚被创建的时候,HBase默认只会分配一个Region给这个表。也就是说这个时候,所有的读写请求都会访问到同一个RegionServer中的同一个Region中,出现读写热点问题。
并且当Region管理的数据量过多,或HFile文件较大时,都会影响性能。

为了达到负载均衡,当Region达到一定的大小时就会将一个Region分裂成两个新的子Region,并对父 Region进行清除处理。

HMaster会根据Balance策略,重新分配Region所属的RegionServer,最大化的发挥分布式系统的优点。

触发Region Split的条件:
-ConstantSizeRegionSplitPolicy (0.94版本前):
一个Region中最大HFile文件的大小大于设置的阈值(hbase.hregion.max.filesize)之后才会触发切分,HFile文件大小为压缩后的文件大小(针对启用压缩的场景),默认文件大小的阈值为10G。
这种策略简单粗暴,但是弊端相当大。
阈值设置偏大的话,对大表友好,小表可能不会触发分裂,极端情况下小表可能就只会有一个Region。
阈值设置偏小的话,对小表友好,但一个大表可能会在集群中产生大量的Region,对于集群管理来说不是好事。

-IncreasingToUpperBoundRegionSplitPolicy (0.94版本~2.x版本默认切分策略):
一个Region中最大HFile文件的大小大于设置的阈值就会触发切分,区别是这个阈值并不像 ConstantSizeRegionSplitPolicy是一个固定的值,这里的阈值是会不断调整的。调整规则和Region所属表在当前RegionServer上的Region个数有关系。
公式:调整后的阈值 = Region个数的3次方 * flush_size * 2
注意:这里的阈值不会无限增大,会通过hbase.hregion.max.filesize来进行限制,不能超过这个参数的大小。
这种策略能够自适应大小表,集群规模大的情况下,对大表很优秀,对小表会产生大量小Region(比第一种策略好一些)。

Region Balance(负载均衡策略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Region分裂之后就会涉及到Region的负载均衡。

HBase的HMaster进程会自动根据指定策略挑选出一些Region,并将这些Region分配到负载比较低的RegionServer上。

由于HBase的所有数据都是写入到HDFS文件系统中的, 因此HBase的Region移动其实是非常轻量级。在做Region移动的时候,保持这个Region对应的HDFS文件位置不变,只需要将Region的元数据分配到对应的RegionServer中即可。

官方目前支持两种挑选Region的策略:
DefaultLoadBalancer和StochasticLoadBalancer。
-DefaultLoadBalancer:这种策略能够保证每个RegionServer中的Region个数基本上都相等。
-StochasticLoadBalancer:这种策略非常复杂,简单来讲是一种综合权衡6个因素的均衡策略。

采用6个因素加权的方式算出一个代价值,这个代价值用来评估当前Region分布是否均衡,越均衡代价值越低。
-每台服务器读请求数(ReadRequestCostFunction)
-每台服务器写请求数(WriteRequestCostFunction)
-Region个数(RegionCountSkewCostFunction)
-移动代价(MoveCostFunction)
-数据Locality(TableSkewCostFunction)
-每张表占据RegionServer中Region个数上限(LocalityCostFunction)

4 HBase高级用法

列族高级设置

生存时间(TTL)

1
应用系统经常需要从数据库里删除老数据,配置此项,可使数据增加生命周期,超过该配置时间的数据,将会在大合并时“被删除”。(单位:秒)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hbase(main):009:0> create 't3', {NAME => 'cf1', TTL => '18000'}
Created table t3
Took 1.2689 seconds
=> Hbase::Table - t3
hbase(main):053:0> desc 't3'
Table t3 is ENABLED
t3
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '1', NEW_VERSION_BEHAVIOR => 'false', KEEP
_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => '1800
0 SECONDS (5 HOURS)', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', B
LOOMFILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'NONE', BLOC
KCACHE => 'true', BLOCKSIZE => '65536'}

1 row(s)
Quota is disabled
Took 0.0359 seconds

因此,HBase 中数据被删除的真正时机是在发生 Major Compaction 的时候。如果想要立即删除数据,可以手动执行 Major Compaction 命令;如果想要保留数据的历史版本,可以设置 KEEP_DELETED_CELLS=true 参数。

版本数

1
在0.96的版本之前默认每个列族是3个version, 0.96之后每个列族是1个version,在大合并时,会遗弃过期的版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hbase(main):010:0> create 't4', {NAME => 'cf1', VERSIONS => 3}
Created table t4
Took 1.2638 seconds
=> Hbase::Table - t4
hbase(main):011:0> desc 't4'
Table t4 is ENABLED
t4
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '3', EVICT_BLOCKS_ON_CLOSE => 'false', NEW
_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DAT
A_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER'
, MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW',
CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_O
N_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION =>
'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}

压缩

1
2
3
4
HFile可以被压缩并存放在HDFS上。这有助于节省硬盘IO,但是读写数据时压缩和解压缩会抬高CPU利用率。
压缩是表定义的一部分,可以在建表或修改表结构时设定。建议打开表的压缩,除非你确定不会从压缩中受益。只有在数据不能被压缩或者因为某种原因服务器的CPU利用率有限制要求的情况下,有可能会关闭压缩特性。

HBase可以使用多种压缩编码,包括LZO、SNAPPY和GZIP (hadoop,hive,impala也有压缩概念)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hbase(main):012:0> create 't5',{NAME => 'cf1', COMPRESSION => 'SNAPPY'}
Created table t5
Took 1.3364 seconds
=> Hbase::Table - t5
hbase(main):013:0> desc 't5'
Table t5 is ENABLED
t5
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW
_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DAT
A_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER'
, MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW',
CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_O
N_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION =>
'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}

数据块(BLOCKSIZE)大小的配置

1
2
3
随机查询:数据块越小,索引越大,查找性能更好
顺序查询:更好的顺序扫描,需要更大的数据块
所以在使用的时候根据业务需求来判断是随机查询需求多还是顺序查询需求多,根据具体的场景而定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hbase(main):014:0> create 't6',{NAME => 'cf1', BLOCKSIZE => '65537'}
Created table t6
Took 1.2451 seconds
=> Hbase::Table - t6
hbase(main):015:0> desc 't6'
Table t6 is ENABLED
t6
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW
_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DAT
A_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER'
, MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW',
CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_O
N_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION =>
'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65537'}

HBase 表定义中的 blocksize 是指 HBase 存储数据的最小单元,也是 HBase 缓存数据的基本单位。HBase 数据是存储在 HFile 文件中的,每个 HFile 文件由多个 block 组成,每个 block 包含一定数量的 KeyValue 对。当 HBase 读取数据时,会从 HFile 中读取一个或多个 block 到内存中,并缓存在 BlockCache 中,以提高后续读取的性能。

blocksize 的大小会影响 HBase 的读写效率和内存占用。blocksize 太小,会导致 HFile 文件数量过多,增加索引和元数据的开销;blocksize 太大,会导致读取数据时需要读取更多的无关数据,浪费内存和网络资源。因此,需要根据数据的特点和访问模式来调整 blocksize 的大小,以达到最佳的性能。

HBase 的默认 blocksize 是 64 KB,但是可以针对不同的列族进行配置。一般来说,如果列族中的数据大小比较均匀,并且访问模式比较随机,可以使用较小的 blocksize;如果列族中的数据大小比较不均匀,并且访问模式比较顺序,可以使用较大的 blocksize。
1
2
3
4
5
HBase 的数据块和 HDFS 的数据块是不同的概念,它们用于不同的目的。HBase 的数据块是 HBase 存储数据的最小单元,也是 HBase 缓存数据的基本单位;HDFS 的数据块是 HDFS 存储数据的最小单元,也是 HDFS 分布数据的基本单位。HBase 的数据块通常比 HDFS 的数据块要小得多,因为 HBase 需要支持快速的随机访问,而 HDFS 需要支持高效的顺序访问。

HBase 数据是存储在 HFile 文件中的,每个 HFile 文件由多个 HBase 数据块组成,每个 HBase 数据块包含一定数量的 KeyValue 对。HFile 文件又是存储在 HDFS 上的,每个 HFile 文件由多个 HDFS 数据块组成,每个 HDFS 数据块包含一定数量的字节。当 HBase 读取数据时,会先从内存中的 BlockCache 或 MemStore 中查找,如果没有找到,会从 HDFS 中读取一个或多个 HBase 数据块,并缓存在 BlockCache 中。当 HBase 写入数据时,会先写入内存中的 MemStore 和 WAL 中,当 MemStore 满了时,会将数据刷写到一个新的 HFile 文件中,并写入多个 HDFS 数据块。

因此,HBase 读取一个数据块时,并不需要读取一个完整的 HDFS 数据块,而是可以根据索引和偏移量直接定位到所需的 HBase 数据块,并从 HDFS 中读取该部分数据。这样可以减少磁盘和网络的开销,提高随机访问的性能。
1
2
3
4
5
HFile 和 HDFS 数据块相比,一般来说 HFile 更大,因为 HFile 是由多个 HBase 数据块组成的,而 HBase 数据块又是存储在 HDFS 数据块中的。HFile 的大小取决于 HBase 数据的量和 MemStore 的刷新策略,通常是几百 MB 或者几 GB 的级别;HDFS 数据块的大小取决于 HDFS 的配置,通常是 64 MB 或者 128 MB 的级别。

HFile 和 HDFS 数据块的大小会影响 HBase 的读写性能和存储效率。HFile 太大,会导致读取数据时需要扫描更多的索引和数据块,增加磁盘和网络的开销;HFile 太小,会导致 HFile 文件数量过多,增加元数据和管理的开销。HDFS 数据块太大,会导致读取数据时需要读取更多的无关数据,浪费内存和网络资源;HDFS 数据块太小,会导致 HDFS 文件分片过多,增加 NameNode 的负担。

因此,需要根据数据的特点和访问模式来调整 HFile 和 HDFS 数据块的大小,以达到最佳的性能。一般来说,如果数据比较稀疏,并且访问模式比较随机,可以使用较小的 HFile 和 HDFS 数据块;如果数据比较密集,并且访问模式比较顺序,可以使用较大的 HFile 和 HDFS 数据块。
1
2
3
4
5
6
7
8
9
10
11
12
13
根据我的搜索结果,hbase数据块的大小是可以在建表语句中通过参数BlockSize指定的,而memstore和blockcache的大小是可以在配置文件中设置的。因此,它们之间不一定相等,也不一定有固定的比例关系。12

memstore是hbase的写缓存,用于存储最近更新的数据,blockcache是hbase的读缓存,用于存储最近访问的数据块。12

hbase提供了几种blockcache方案,有LruBlockCache, SlabCache, BucketCache和ExternalBlockCache。123

LruBlockCache是默认的方案,它将所有数据都放入JVM堆中,按照访问频率分为三个优先级队列:single, multi和in memory。12

SlabCache是0.92版本提供的方案,它使用堆外内存存储固定大小的block,但是在1.0版本后被废弃了。12

BucketCache是0.95版本提供的方案,它支持多种缓存方式和多种不同大小的bucket,以适应不同大小的block size。12

ExternalBlockCache是1.10版本提供的方案,它使用外部进程管理缓存数据,以减少JVM内存压力。

数据块缓存(BLOCKCACHE)

1
2
如果一张表或表里的某个列族只被顺序化扫描访问或者很少被访问,这个时候就算Get或Scan花费时间是否有点儿长,你也不会很在意。在这种情况下,你可以选择关闭那些列族的缓存。
如果你只是执行很多顺序化扫描,你会多次倒腾缓存,并且可能会滥用缓存把应该放进缓存获得性能提升的数据给排挤出去。如果关闭缓存,不仅可以避免上述情况发生,而且还可以让出更多缓存给其他表和同一个表的其他列族使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hbase(main):016:0> create 't7',{NAME => 'cf1', BLOCKCACHE => 'false'}
Created table t7
Took 2.2954 seconds
=> Hbase::Table - t7
hbase(main):017:0> desc 't7'
Table t7 is ENABLED
t7
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW
_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DAT
A_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER'
, MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW',
CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_O
N_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION =>
'NONE', BLOCKCACHE => 'false', BLOCKSIZE => '65536'}

布隆过滤器(Bloom filters)

1
2
3
4
5
6
HBase中存储额外的索引层次会占用额外的空间。布隆过滤器随着它们的索引对象的数据增长而增长,所以行级布隆过滤器比列标识符级布隆过滤器占用空间要少。当空间不是问题的时候,它们可以帮助你榨干系统的性能潜力。

BLOOMFILTER参数的默认值是ROW,表示是行级布隆过滤器。
使用行级布隆过滤器需要设置为ROW,使用列标识符级布隆过滤器需要设置为ROWCOL。
行级布隆过滤器在数据块里检查特定行键是否不存在,列标识符级布隆过滤器检查行和列标识符联合体是否不存在。
ROWCOL布隆过滤器的开销要高于ROW布隆过滤器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hbase(main):018:0> create 't8',{NAME => 'cf1', BLOOMFILTER => 'ROWCOL'}
Created table t8
Took 1.2484 seconds
=> Hbase::Table - t8
hbase(main):019:0> desc 't8'
Table t8 is ENABLED
t8
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW
_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DAT
A_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER'
, MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROWCO
L', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOM
S_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION
=> 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}

Scan (全表扫描)

1
2
3
4
5
6
7
HBase中的Scan操作,类似于SQL中的select * from ...
例子:

scan 'hbase:meta'
scan 'hbase:meta',{COLUMNS => 'info:regioninfo'}
scan 't1',{COLUMNS=>'c1',TIMERANGE=>[1303668804,1303668904]}
scan 't1',{REVERSED=>true}

Scan的Java API用法

1
2
3
4
5
6
7
8
9
10
11
12
13
Scan 在Java API中支持以下用法:

scan.addFamily(); //指定列族
scan.addColumn(); //指定列,如果没有调用任何addFamily或Column,会返回所有的columns;
scan.readAllVersions(); //读取所有版本数据。
scan.readVersions(3); //读取最新3个版本的数据
scan.setTimeRange(); //指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取.
scan.setTimeStamp(); //指定时间戳
scan.setFilter(); //指定Filter来过滤掉不需要的信息
scan.withStartRow(); //指定开始的行。如果不指定,则从表头开始
scan.withStopRow(); //指定结束的行(不含此行)
scan.setBatch(); //指定最多返回的Cell数目。用于防止一行中有过多的数据,导致OutofMemory错误。
scan.setCaching(); //指定scan底层每次连接返回的数据条数,默认值为1,适当调大可以提高查询性能,设置太大会比较耗内存
常见的Filter
1
在进行Scan的时候,可以添加Filter实现数据过滤
RowFilter
1
2
3
4
5
6
7
8
9
10
11
12
RowFilter:对Rowkey进行过滤。

//小于等于 x
Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes(“x”)));
//正则 以x结尾
Filter filter = new RowFilter(CompareOperator.EQUAL,new RegexStringComparator(".*x"));
//包含 x
Filter filter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(“x”));
//开头 x
Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(“x”)));

CompareOperator.其他比较参数
1
2
3
4
5
6
7
LESS   				小于   
LESS_OR_EQUAL 小于等于
EQUAL 等于
NOT_EQUAL 不等于
GREATER_OR_EQUAL 大于等于
GREATER 大于
NO_OP 排除所有
PrefixFilter
1
2
PrefixFilter:筛选出具有特定前缀的行键的数据。
Filter pf = new PrefixFilter(Bytes.toBytes(“前缀”));
ValueFilter
1
2
ValueFilter:按照具体的值来筛选列中的数据,这会把一行中值不能满足的列过滤掉
Filter vf = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator(“ROW2_QUAL1”));
Scan和Filter结合的案例
1
注意:在执行下面代码之前,先创建表,初始化一批数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
hbase(main):001:0> create 's1','c1','c2'
Created table s1
Took 2.2063 seconds
=> Hbase::Table - s1
hbase(main):026:0> put 's1','a','c1:name','zs'
hbase(main):026:0> put 's1','a','c1:age','18'
hbase(main):026:0> put 's1','a','c2:score','99'

hbase(main):026:0> put 's1','b','c1:name','jack'
hbase(main):026:0> put 's1','b','c1:age','21'
hbase(main):026:0> put 's1','b','c2:score','85'

hbase(main):026:0> put 's1','c','c1:name','tom'
hbase(main):026:0> put 's1','c','c1:age','31'
hbase(main):026:0> put 's1','c','c2:score','79'

hbase(main):026:0> put 's1','d','c1:name','lili'
hbase(main):026:0> put 's1','d','c1:age','27'
hbase(main):026:0> put 's1','d','c2:score','65'

hbase(main):026:0> put 's1','e','c1:name','ww'
hbase(main):026:0> put 's1','e','c1:age','35'
hbase(main):026:0> put 's1','e','c2:score','100'

hbase(main):026:0> put 's1','f','c1:name','jessic'
hbase(main):026:0> put 's1','f','c1:age','12'
hbase(main):026:0> put 's1','f','c2:score','77'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.imooc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.List;

/**
* 全表扫描Scan+Filter
*
* 注意:首先需要创建表s1,然后初始化一批数据
* Created by xuwei
*/
public class HBaseScanFilter {
public static void main(String[] args) throws Exception{
//获取配置
Configuration conf = HBaseConfiguration.create();
//指定HBase使用的zk的地址,多个都逗号隔开
conf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181");
//指定HBase在hdfs上的根目录
conf.set("hbase.rootdir","hdfs://bigdata01:9000/hbase");
//创建HBase连接,负责对HBase中数据的增删改查(DML操作)
Connection conn = ConnectionFactory.createConnection(conf);
//获取Table对象,指定要操作的表名,表需要提前创建好
Table table = conn.getTable(TableName.valueOf("s1"));

Scan scan = new Scan();
//范围查询:指定查询区间,提高查询性能
//这是一个左闭右开的区间,也就是查询的结果中包含左边的,不包含右边的
scan.withStartRow(Bytes.toBytes("a"));
scan.withStopRow(Bytes.toBytes("f"));

//添加Filter对数据进行过滤:使用RowFilter进行过滤,获取Rowkey小于等于d的数据
Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("d")));
scan.setFilter(filter);

//获取查询结果
ResultScanner scanner = table.getScanner(scan);
//迭代查询结果
for (Result result: scanner) {
List<Cell> cells = result.listCells();
//RowKey
byte[] row_key = result.getRow();
for (Cell cell: cells) {
//注意:下面获取的信息都是字节类型的,可以通过new String(bytes)转为字符串
//列族
byte[] famaily_bytes = CellUtil.cloneFamily(cell);
//列
byte[] column_bytes = CellUtil.cloneQualifier(cell);
//值
byte[] value_bytes = CellUtil.cloneValue(cell);
System.out.println("Rowkey:"+new String(row_key)+",列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
}
System.out.println("================================================================");
}

//关闭连接
table.close();
conn.close();


}
}

批量导入

1
2
3
4
5
6
批量导入两种方式:

-利用MapReduce中封装好的方法。在map阶段,把数据封装成Put操作,直接将数据入库。
-利用Bulkload。首先使用MapReduce直接生成HFile文件,然后再通过Bulkload将HFile文件直接加载到表中。

Bulkload的优势:通过MR生成HBase底层HFile文件,直接加载到表中,省去了大部分的RPC和写过程。

批量导入之 MapReduce

1
2
3
4
5
首先初始化输入数据:
在linux中创建文件:hbase_import.dat
内容如下:

注意:字段分隔符为制表符
1
2
3
4
5
6
7
[root@bigdata04 soft]# vi hbase_import.dat
a c1 name zs
a c1 age 18
b c1 name ls
b c1 age 29
c c1 name ww
c c1 age 31
1
2
3
4
5
6
7
8
把hbase_import.dat上传到HDFS中
[root@bigdata04 soft]# hdfs dfs -put hbase_import.dat /

在HBase中创建需要用到的表batch1
hbase(main):027:0> create 'batch1','c1'
Created table batch1
Took 1.3600 seconds
=> Hbase::Table - batch1
1
此时项目的pom.xml文件中除了添加hbase-client的依赖,还需要添加hadoop-client和hbase-mapreduce的依赖,否则代码报错
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.7</version>
</dependency>
1
详细代码如下:
BatchImportMR
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.imooc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
* 批量导入:
* 1:利用MapReduce中封装好的方法。
* 在map阶段,把数据封装成Put操作,直接将数据入库
*
* 注意:需要提前创建表batch1
* create 'batch1','c1'
*
* Created by xuwei
*/
public class BatchImportMR {

public static class BatchImportMapper extends Mapper<LongWritable, Text, NullWritable, Put>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
if(strs.length==4){
String rowkey = strs[0];
String columnFamily = strs[1];
String name = strs[2];
String val = strs[3];
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
context.write(NullWritable.get(),put);
}
}
}

public static void main(String[] args) throws Exception{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

String inPath = args[0];
String outTableName = args[1];
//设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.table.name",outTableName);
conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

//封装Job
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
job.setJarByClass(BatchImportMR.class);

//指定输入路径
FileInputFormat.setInputPaths(job,new Path(inPath));

//指定map相关的代码
job.setMapperClass(BatchImportMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);

TableMapReduceUtil.initTableReducerJob(outTableName,null,job);
TableMapReduceUtil.addDependencyJars(job);

//禁用Reduce
job.setNumReduceTasks(0);

job.waitForCompletion(true);
}
}
1
2
3
4
对代码打jar包:
在打jar包之前,需要对pom.xml中的依赖添加<scope>provided</scope>配置

注意:hbase-client和hbase-mapreduce不能设置provided,这两个依赖需要打进jar包里面,否则会提示找不到对应的类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
还需要添加maven打包配置

<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
执行打jar包命令
C:\Users\yehua>d:

D:\>cd IdeaProjects\db_hbase

D:\IdeaProjects\db_hbase>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building db_hbase 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.4.1:clean (default-clean) @ db_hbase ---
[INFO] Deleting D:\IdeaProjects\db_hbase\target
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ db_hbase ---
[INFO] Building jar: D:\IdeaProjects\db_hbase\target\db_hbase-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.2-beta-5:single (make-assembly) @ db_hbase ---
[INFO] Building jar: D:\IdeaProjects\db_hbase\target\db_hbase-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.491s
[INFO] Finished at: Sun Nov 22 12:01:56 CST 2020
[INFO] Final Memory: 99M/356M
[INFO] ------------------------------------------------------------------------

将生成的db_hbase-1.0-SNAPSHOT-jar-with-dependencies.jar上传到bigdata04机器上
然后向集群中提交此MapReduce任务

注意:需要确保Hadoop集群、Zookeeper集群、HBase集群可以正常工作
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hbase-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.hbase.BatchImportMR hdfs://bigdata01:9000/hbase_import.dat batch1

执行成功之后,查询HBase中batch1表中的结果如下:

hbase(main):013:0> scan 'batch1'
ROW COLUMN+CELL
a column=c1:age, timestamp=1778305406350, value=18
a column=c1:name, timestamp=1778305406350, value=zs
b column=c1:age, timestamp=1778305406350, value=29
b column=c1:name, timestamp=1778305406350, value=ls
c column=c1:age, timestamp=1778305406350, value=31
c column=c1:name, timestamp=1778305406350, value=ww
3 row(s)
Took 0.0455 seconds

批量导入之 BulkLoad

image-20230622152857361

1
2
3
4
5
6
7
8
测试数据继续使用hdfs中的hbase_import.dat

在hbase中创建需要用到的表batch2

hbase(main):027:0> create 'batch2','c1'
Created table batch2
Took 1.3600 seconds
=> Hbase::Table - batch2
1
2
3
4
5
想要实现BulkLoad需要两步
第一步:先生成HFile文件
代码如下:

注意:此代码会生成底层的HFile文件
BatchImportBulkLoad
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.imooc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 批量导入
*
* 2.利用BulkLoad
* 在map阶段,把数据封装成put操作,将数据生成HBase的底层存储文件HFile
* 再将生成的HFile文件加载到表中
* Created by xuwei
*/
public class BatchImportBulkLoad {
public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
if(strs.length==4){
String rowkey = strs[0];
String columnFamily = strs[1];
String name = strs[2];
String val = strs[3];
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey.getBytes());
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
context.write(rowkeyWritable,put);
}
}
}

public static void main(String[] args) throws Exception{
if(args.length!=3){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

String inPath = args[0];
String outPath = args[1];
String outTableName = args[2];

//设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.table.name",outTableName);
conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

//封装Job
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
job.setJarByClass(BatchImportBulkLoad.class);

//指定输入路径
FileInputFormat.setInputPaths(job,new Path(inPath));

//指定输出路径[如果输出路径存在,就将其删除]
FileSystem fs = FileSystem.get(conf);
Path output = new Path(outPath);
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job, output);

//指定map相关的代码
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

//禁用Reduce
job.setNumReduceTasks(0);

Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf(outTableName);
HFileOutputFormat2.configureIncrementalLoad(job,connection.getTable(tableName),connection.getRegionLocator(tableName));


job.waitForCompletion(true);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
打jar包
D:\IdeaProjects\db_hbase>mvn clean package -DskipTests
....
[INFO] META-INF/native/ already added, skipping
[INFO] org/ already added, skipping
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 34.460s
[INFO] Finished at: Sun Nov 22 17:47:48 CST 2020
[INFO] Final Memory: 115M/1075M
[INFO] ------------------------------------------------------------------------

向集群中提交任务
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hbase-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.hbase.BatchImportBulkLoad hdfs://bigdata01:9000/hbase_import.dat hdfs://bigdata01:9000/hbase_out batch2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
第二步:加载HFile文件
在HBase客户端节点上执行下面命令,把HFile数据转移到表对应的region中。
[root@bigdata04 hbase-2.2.7]# hbase org.apache.hadoop.hbase.tool.BulkLoadHFilesTool hdfs://bigdata01:9000/hbase_out batch2

查看表batch2中的数据
hbase(main):001:0> scan 'batch2'
ROW COLUMN+CELL
a column=c1:age, timestamp=1778308217857, value=18
a column=c1:name, timestamp=1778308217857, value=zs
b column=c1:age, timestamp=1778308217857, value=29
b column=c1:name, timestamp=1778308217857, value=ls
c column=c1:age, timestamp=1778308217857, value=31
c column=c1:name, timestamp=1778308217857, value=ww
3 row(s)
Took 1.0230 seconds

批量导出

1
2
3
4
批量导出两种方式:

利用TableMapReduceUtil将数据导出 (需要开发MapReduce代码)
利用HBase内部提供的Export工具类

批量导出之TableMapReduceUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
将HBase中的表batch1中的数据导出到hdfs上面
表batch1中的数据如下:

hbase(main):001:0> scan 'batch1'
ROW COLUMN+CELL
a column=c1:age, timestamp=1778305406350, value=18
a column=c1:name, timestamp=1778305406350, value=zs
b column=c1:age, timestamp=1778305406350, value=29
b column=c1:name, timestamp=1778305406350, value=ls
c column=c1:age, timestamp=1778305406350, value=31
c column=c1:name, timestamp=1778305406350, value=ww
3 row(s)
Took 0.7332 seconds
BatchExportTableMapReduceUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.imooc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 批量导出
* 1.利用TableMapReduceUtil将数据导出
* Created by xuwei
*/
public class BatchExportTableMapReduceUtil {
public static class BatchExportMapper extends TableMapper<Text,Text>{
@Override
protected void map(ImmutableBytesWritable key, Result result, Context context)
throws IOException, InterruptedException {
//key在这里就是hbase的Rowkey
//result是scan返回的每行结果
byte[] name = null;
byte[] age = null;
try{
name = result.getValue("c1".getBytes(), "name".getBytes());
}catch (Exception e){}
try{
age = result.getValue("c1".getBytes(), "age".getBytes());
}catch (Exception e){}

String v2 = ((name==null || name.length==0)?"NULL":new String(name))+"\t"+((age==null || age.length==0)?"NULL":new String(age));

context.write(new Text(key.get()),new Text(v2));
}
}

public static void main(String[] args) throws Exception{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}

String inTableName = args[0];
String outPath = args[1];

//设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

//组装Job
Job job = Job.getInstance(conf);
job.setJarByClass(BatchExportTableMapReduceUtil.class);

//设置map相关的配置
job.setMapperClass(BatchExportMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//禁用Reduce
job.setNumReduceTasks(0);

//设置输入信息
TableMapReduceUtil.initTableMapperJob(inTableName,new Scan(),BatchExportMapper.class,Text.class,Text.class,job);

//设置输出路径
FileOutputFormat.setOutputPath(job,new Path(outPath));

job.waitForCompletion(true);

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
打jar包
D:\IdeaProjects\db_hbase>mvn clean package -DskipTests
......
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 34.797s
[INFO] Finished at: Mon Nov 23 11:03:26 CST 2020
[INFO] Final Memory: 66M/1129M
[INFO] ------------------------------------------------------------------------

将jar包是上传到bigdata04上面,然后向集群提交任务
[root@bigdata04 hadoop-3.2.0]# hadoop jar db_hbase-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.hbase.BatchExportTableMapReduceUtil batch1 hdfs://bigdata01:9000/batch1

查看导出结果数据:
[root@bigdata04 hadoop-3.2.0]# hdfs dfs -cat /batch1/*
a zs 18
b ls 29
c ww 31
1
注意:想要导出什么格式的数据,具体的逻辑代码在map函数内部根据需求实现即可。

批量导出之HBase内部方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
使用HBase提供的Export工具类直接导出数据
[root@bigdata04 ~]# hbase org.apache.hadoop.hbase.mapreduce.Export batch1 hdfs://bigdata01:9000/batch2

注意:此种方式导出的数据格式是固定的
数据中的k1和v1是<ImmutableBytesWritable key, Result result>形式的

查看结果是这样的:
注意:直接使用cat命令查看会显示乱码,因为不是普通的文本文件

[root@bigdata04 ~]# hdfs dfs -cat /batch2/*
SEQ1org.apache.hadoop.hbase.io.ImmutableBytesWritable%org.apache.hadoop.hbase.client.Result#{MdAa;
_x0019_
a_x0012_c1age 218

a_x0012_c1name 2zs (Ab;
_x0019_
b_x0012_c1age 229

b_x0012_c1name 2ls (Ac;
_x0019_
c_x0012_c1age 231

c_x0012_c1name 2ww (

建议优先选择使用第一种,更加灵活,根据需求导出希望的数据格式。

HBase连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注意:HBase2.1.2之后的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可。
Connection conn = ConnectionFactory.createConnection(conf);
//conn在获取Table的时候,底层默认会从线程池中获取一个连接
Table table = conn.getTable(TableName.valueOf("s1"));

所以table在用完之后可以直接调用close方法即可,在程序关闭的时候调用conn的close方法即可。

查看源码最终可以追踪到这里:

org.apache.hadoop.hbase.client.ConnectionImplementation
默认会创建一个线程池,256个连接。
private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
int threads = conf.getInt("hbase.hconnection.threads.max", 256);
this.batchPool = getThreadPool(threads, threads, "-shared", null);
this.cleanupPool = true;
}
}
}
return this.batchPool;
}

参考链接


本文标题:大数据开发工程师-快速上手NoSQL数据库HBase-2

文章作者:TTYONG

发布时间:2023年06月13日 - 09:06

最后更新:2023年07月04日 - 23:07

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8BNoSQL%E6%95%B0%E6%8D%AE%E5%BA%93HBase-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%