大数据开发工程师-第十二周 综合项目:电商数据仓库之用户行为数仓3


电商数据仓库之用户行为数仓3-数据生成与采集

1
2
接下来我们就来开发第一个模块:数据采集模块
这一块内容在开发的时候,我们需要先生成测试数据,一份是服务端数据,还有一份是客户端数据

数据生成

【客户端数据】用户行为数据

1
2
3
首先我们模拟生成用户行为数据,也就是客户端数据,主要包含用户打开APP、点击、浏览等行为数据
用户行为数据:通过埋点上报,后端日志服务器(http)负责接收数据
埋点上报数据基本格式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"uid":1001, //用户ID
"xaid":"ab25617-c38910-m2991", //手机设备ID
"platform":2, //设备类型, 1:Android-APP, 2:IOS-APP, 3:PC
"ver":"3.5.10", //大版本号
"vercode":"35100083", //子版本号
"net":1, //网络类型, 0:未知, 1:WIFI, 2:2G , 3:3G, 4:4G, 5:5G
"brand":"iPhone", //手机品牌
"model":"iPhone8", //机型
"display":"1334x750", //分辨率
"osver":"ios13.5", //操作系统版本号
"data":[ //用户行为数据
{"act":1,"acttime":1592486549819,"ad_status":1,"loading_time":100},
{"act":2,"acttime":1592486549819,"goods_id":"2881992"}
]
}
1
2
3
4
这个json串中的data是一个json数组,它里面包含了多种用户行为数据。
json串中的其它字段属于公共字段

注意:考虑到性能,一般数据上报都是批量上报,假设间隔10秒上报一次,这种数据延迟是可以接受的
1
所以在每次上报的时候,公共字段只需要报一份就行,把不同的用户行为相关的业务字段放到data数组中,这样可以避免上报大量的重复数据,影响数据上报性能,我们只需要在后期解析的时候,把公共字段和data数组总的每一条业务字段进行拼装,就可以获取到每一个用户行为的所有字段信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
act代表具体的用户行为,在这列出来几种
act=1:打开APP
属性 含义
act 用户行为类型
acttime 数据产生时间(时间戳)
ad_status 开屏广告展示状态, 1:成功 2:失败
loading_time 开屏广告加载耗时(单位毫秒)

act=2:点击商品
属性 含义
act 用户行为类型
acttime 数据产生时间(时间戳)
goods_id 商品ID
location 商品展示顺序:在列表页中排第几位,从0开始

act=3:商品详情页
属性 含义
act 用户行为类型
acttime 数据产生时间(时间戳)
goods_id 商品ID
stay_time 页面停留时长(单位毫秒)
loading_time 页面加载耗时(单位毫秒)

act=4:商品列表页
属性 含义
act 用户行为类型
acttime 数据产生时间(时间戳)
loading_time 页面加载耗时(单位毫秒)
loading_type 加载类型:1:读缓存 2:请求接口
goods_num 列表页加载商品数量

act=5:app崩溃数据
属性 含义
act 用户行为类型
acttime 数据产生时间(时间戳)

生成用户行为测试数据

1
这里实现不了,课程提供的用户行为生成接口,需要提供uid、课程订单,然后执行提前编写好的测试数据生成代码。

部署日志采集服务

1
2
部署日志采集服务,模拟埋点上报数据的流程,代码在db_data_warehouse中的data_collect这个子项目中,将这个子项目打成jar包,部署到bigdata04服务器中,并且启动此HTTP服务。
对data_collect执行打包操作,在cmd命令下执行 mvn clean package -DskipTests
1
2
3
4
5
6
7
8
9
10
D:\IdeaProjects\db_data_warehouse\data_collect>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] ----------------------------------------------------------------------
[INFO] Building data_collect 1.0-SNAPSHOT
[INFO] ----------------------------------------------------------------------
[INFO] Replacing main artifact with repackaged archive
[INFO] ----------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ----------------------------------------------------------------------
1
2
3
在bigdata04的/data/soft/目录下创建data_collect目录

1 [root@bigdata04 soft]# mkdir data_collect
1
2
3
4
5
6
7
8
然后把target目录下的data_collect-1.0-SNAPSHOT.jar上传到bigdata04的 /data/soft/data_collect里面

接着就可以启动这个项目了,这个其实就是一个web项目。
为了后面使用方便,我在这里面写一个启动脚本

[root@bigdata04 data_collect]# vi start.sh
nohup java -jar data_collect-1.0-SNAPSHOT.jar >> nohup.out &
[root@bigdata04 data_collect]# sh start.sh
1
2
3
4
确认是否成功启动
[root@bigdata04 data_collect]# jps -ml
1601 sun.tools.jps.Jps -ml
1563 data_collect-1.0-SNAPSHOT.jar
1
获取到的数据格式是这样的:

image-20230330233905222

image-20230330233917373

1
2
3
4
5
首先解析data属性的值,里面包含了多个用户的行为数据
并且每个用户的行为数据中还包含了多种具体的行为操作,因为客户端在上报数据的时候不是产生一条就上报一条,这样效率太低了,一般都会批量上报,所以内层json串中还有一个data参数,data参数的值是一个JSONArray,里面包含一个用户的多种行为数据

然后通过接口模拟上报数据,data_collect接口接收到数据之后,会对数据进行拆分,将包含了多个用户行为的数据拆开,打平,输出多条日志数据
日志数据格式是这样的:

image-20230330234319650

1
2
3
4
5
6
7
最终的日志数据会保存在data_collect这个日志采集服务所在的机器上,通过log4j记录在/data/log目录下面。
去确认一下:
[root@bigdata04 log]# ll
total 32
-rw-r--r--. 1 root root 20881 Jun 30 18:00 user_action.log
[root@bigdata04 log]# head -1 user_action.log
{"ver":"3.4.1","display":"1920x1080","osver":"7.1.1","platform":1,"uid":"1000

image-20230330234726391

1
到这为止,用户行为数据就生成好了。

【服务端数据】商品订单相关数据

1
2
3
4
5
6
7
8
9
10
11
12
13
接下来需要生成商品订单相关数据,这些数据都是存储在mysql中的
注意:MySQL在这里我使用的版本是8.x

相关表名为:
订单表:user_order
商品信息表:goods_info
订单商品表:order_item
商品类目码表:category_code
订单收货表:order_delivery
支付流水表:payment_flow
用户收货地址表:user_addr
用户信息表:user
用户扩展表:user_extend

image-20230330235704680

1
powerdesigner这个软件设计的
1
2
3
首先在MySQL中初始化数据库和表。
使用这个脚本进行初始化: init_mysql_tables.sql
初始化成功之后的效果如下:

image-20230331000219996

1
2
3
4
5
6
7
8
9
接下来需要向表中初始化数据。
使用generate_data项目中的这个类:GenerateGoodsOrderData

在具体执行之前需要先修改GenerateGoodsOrderData中的几个参数值
(1)code的值
(2)date的值
(3)user_num的值
(4)order_num的值
(5)修改项目的resources目录下的db.properties文件
1
下面就可以执行GenerateGoodsOrderData向MySQL中初始化数据了。

采集数据

采集用户行为数据

配置Flume的Agent

1
数据接收到以后,需要使用flume采集数据,按照act值的不同,将数据分目录存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
flume Agent配置内容如下:
useraction-to-hdfs.conf

# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source组件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/user_action.log
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "act":(\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = act
# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink组件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata01:9000/data/ods/user_action/%Y%m%d/%{a
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#增加文件前缀和后缀
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

开始采集数据

image-20230331010029253

image-20230331010115320

采集商品订单相关数据

Sqoop的使用

1
2
3
4
5
下面我们需要将商品订单数据采集到HDFS里面,咱们前面分析过,在这里针对关系型数据库数据的采集

我们使用Sqoop
使用sqoop的导入功能,将MySQL中的数据导入到HDFS上面
那首先我们来看一下Sqoop的使用,因为Sqoop主要是一个工具,所以我们就快速的学习一下。
1
2
3
4
5
6
7
8
9
Sqoop目前有两大版本,Sqoop1和Sqoop2,这两个版本都是一直在维护者的,所以使用哪个版本都可以。
这两个版本我都用过,还是感觉Sqoop1用起来比较方便,使用Sqoop1的时候可以将具体的命令全部都写到脚本中,这样看起来是比较清晰的,但是有一个弊端,就是在操作MySQL的时候,MySQL数据库的用户名和密码会明文暴露在这些脚本中,不过一般也没有什么问题,因为在访问生产环境下的MySQL的时候,是需要申请权限的,就算你知道了MySQL的用户名和密码,但是你压根无法访问MySQL的那台机器,所以这样也是安全的,只要运维那边权限控制到位了就没问题。
sqoop2中引入了sqoop server(服务),集中管理connector(连接),而sqoop1只是客户端工具。
相对来说,Sqoop1更加简洁,轻量级。
Sqoop1的最后更新时间是2018年
Sqoop2的最后更新时间是2016年
Sqoop2我之前在使用的时候发现里面bug还是比较多的,相对来说Sqoop1更加稳定一些。
所以在这我们采用Sqoop1。
想要使用Sqoop1,先去官网下载安装包
1
注意:最终下载的sqoop1.4.7的安装是这个sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
1
2
3
这个安装包表示里面包含了hadoop-2.6.0的依赖,我们目前使用的是hadoop3.2.0,不过是可以兼容的,这样就没有必要重新编辑sqoop了。
Sqoop的安装部署很简单,因为Sqoop1只是一个客户端工具,直接解压,修改一下配置文件就行,不需要启动任何进程
Sqoop在执行的时候底层会生成MapReduce任务,所以Sqoop需要部署在Hadoop客户端机器上,因为它是依赖于Hadoop的。
1
2
3
4
5
6
7
我们在bigdata04机器上安装部署Sqoop
1:把Sqoop的安装包上传到bigdata04机器的/data/soft目录下
2:解压

3:修改配置文件的名称
[root@bigdata04 soft]# cd sqoop-1.4.7.bin__hadoop-2.6.0/conf
[root@bigdata04 conf]# mv sqoop-env-template.sh sqoop-env.sh
1
2
3
4
5
4:配置SQOOP_HOME环境变量
[root@bigdata04 conf]# vi /etc/profile
...
export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
export PATH=.......$SQOOP_HOME/bin:$PATH
1
2
5:将MySQL 8的驱动jar包,添加到SQOOP_HOME的lib目录下,因为我们需要使用Sqoop操作MySQL
查看验证一下是否成功添加MySQL的驱动jar包
1
2
3
4
5
注意:使用hadoop 3.2.0版本的时候,需要在SQOOP_HOME的lib目录下增加commons-lang2.6.jar
查看验证一下是否成功添加commons-lang-2.6.jar

[root@bigdata04 sqoop-1.4.7.bin__hadoop-2.6.0]# ll lib/commons-lang-2.6.jar
-rw-r--r--. 1 root root 284220 Nov 10 2015 lib/commons-lang-2.6.jar
1
2
3
4
5
6
7
8
9
6:开放MySQL远程访问权限【开放权限以后集群中的机器才可以连接windows上的MySQL服务,否则只能在windows本地访问】
注意:我的MySQL的用户名为root,密码为admin,大家在执行下面命令的时候需要对应替换为自己的MySQL的真实用户名和密码。

C:\Users\yehua>mysql -uroot -padmin
mysql> USE mysql;
mysql> CREATE USER 'root'@'%' IDENTIFIED BY 'admin';
mysql> GRANT ALL ON *.* TO 'root'@'%';
mysql> ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'admin'
mysql> FLUSH PRIVILEGES;
1
2
3
4
到此为止,Sqoop相关的配置全部搞定。
下面我们就来分析一下Sqoop中的两大功能。
导入数据sqoop-import:从MySQL导入HDFS
导出数据sqoop-export:从HDFS导出MySQL
1
来看一下Sqoop的文档

image-20230404152619779

image-20230404152632314

image-20230404152729853

通用参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
整理了文档中一些参数的解释:
Sqoop通用参数:

选项 含义说明
--connect <jdbc-uri> 指定JDBC连接字符串(JDBC一般指Java数据库连接)
--connection-manager <class-name> 指定要使用的连接管理器类
--driver <class-name> 指定要使用的JDBC驱动类
--hadoop-mapred-home <dir> 指定$HADOOP_MAPRED_HOME路径
--help 万能帮助
--password-file 设置用于存放认证的密码信息文件的路径
-P 从控制台读取输入的密码
--password <password> 设置认证密码
--username <username> 设置认证用户名
--verbose 打印详细的运行信息
--connection-param-file <filename> 可选,指定存储数据库连接参数的属性文件
导入参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
导入功能相关参数

选项 含义说明
--append 将数据追加到HDFS上一个已存在的数据集上
--as-avrodatafile 将数据导入到Avro数据文件
--as-sequencefile 将数据导入到SequenceFile
--as-textfile 将数据导入到普通文本文件(默认)
--boundary-query <statement> 边界查询,用于创建分片(InputSplit)
--columns <col,col,col…> 从表中导出指定的一组列的数据
--delete-target-dir 如果指定目录存在,则先删除掉
--direct 使用直接导入模式(优化导入速度)
--direct-split-size <n> 分割输入stream的字节大小(在直接导入模式下)
--fetch-size <n> 从数据库中批量读取记录数
--inline-lob-limit <n> 设置内联的LOB对象的大小
-m,--num-mappers <n> 使用n个map任务并行导入数据
-e,--query <statement> 导入的查询语句
--split-by <column-name> 指定按照哪个列去分割数据
--table <table-name> 导入的源表表名
--target-dir <dir> 导入HDFS的目标路径
--warehouse-dir <dir> HDFS存放表的根路径
--where <where clause> 指定导出时所使用的查询条件
-z,--compress 启用压缩
--compression-codec <c> 指定Hadoop的codec方式(默认gzip)
--null-string <null-string> 如果指定列为字符串类型,使用指定字符串替换值为
--null-non-string <null-string> 如果指定列为非字符串类型,使用指定字符串替换值
全表导入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
下面来看一下数据导入功能:
数据导入可以分为全表导入和查询导入
(1)全表导入:直接把一个表中的所有数据全部导入到HDFS里面
先在MySQL中创建一个数据库和表
C:\Users\yehua>mysql -uroot -padmin
mysql> create database imooc;
Query OK, 1 row affected (0.09 sec)
mysql> use imooc;
Database changed
mysql> create table user(id int(10),name varchar(64));
Query OK, 0 rows affected (0.60 sec)
mysql> insert into table user(id,name) values(1,'jack');
Query OK, 1 row affected (0.16 sec)
mysql> insert into table user(id,name) values(2,'tom');
Query OK, 1 row affected (0.08 sec)
mysql> insert into table user(id,name) values(3,'mike');
Query OK, 1 row affected (0.05 sec)
1
2
3
4
5
6
7
8
9
10
使用Sqoop将imooc.user表中的数据导入到HDFS中
sqoop import \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--table user \
--target-dir /out1 \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t'
1
2
3
4
5
6
注意:如果表中没有主键则会报错(因为mapper数默认是4,需要分4个Task。但是info表又没有主键,MapReduce不知道以哪个字段为准来分Task。)

解决办法有三种:
可以选择在表中设置主键,默认根据主键字段分task
使用–num-mappers 1,表示将map任务个数设置为1,sqoop默认是4
使用–split-by,后面跟上一个数字类型的列,会根据这个列分task
查询导入
1
2
3
4
5
6
7
8
9
10
11
12
(2)查询导入:使用sql语句查询表中满足条件的数据导入到HDFS里面
注意:在使用–query指定sql的时候,则必须包含$CONDITIONS

sqoop import \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--target-dir /out2 \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query 'select id,name from user where id >1 and $CONDITIONS;'
空值如何处理
1
2
3
4
5
6
7
8
9
10
注意:–query和–table不能同时指定
问题:sqoop在导入数据的时候针对空值如何处理?
默认情况下MySQL中的null值(无论字段类型是字符串类型还是数字类型),使用Sqoop导入到HDFS文件中之后,都会显示为字符串null。
针对字符串null类型:通过 --null-string '*' 来指定,单引号中指定一个字符即可,这个字符不能是--,因为 -- 是保留关键字

针对非字符串的null类型:通过 --null-non-string '=' 来指定,单引号中指定一个字符即可,这个字符不能是 -- ,因为 -- 是保留关键字
这两个参数可以同时设置,这样在导入数据的时候,针对空值字段,会替换为指定的内容。

例如:可以使用\N ,因为我们把数据导入到HDFS之后,最终是希望在Hive中查询的,Hive中针对NULL值在底层是使用\N存储的。
当然了,我们也可以选择给NULL值指定一个默认的其它字符。
1
2
3
4
5
6
7
8
9
10
11
sqoop import \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--target-dir /out2 \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query 'select id,name from user where id >1 and $CONDITIONS;' \
--null-string '\\N' \
--null-non-string '\\N' \
导出参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
--validate <class-name> 启用数据副本验证功能,仅支持单表拷贝,可以
--validation-threshold <class-name> 指定验证门限所使用的类
--direct 使用直接导出模式(优化速度)
--export-dir <dir> 导出过程中HDFS源路径
--m,--num-mappers <n> 使用n个map任务并行导出
--table <table-name> 导出的目的表名称
--call <stored-proc-name> 导出数据调用的指定存储过程名
--update-key <col-name> 更新参考的列名称,多个列名使用逗号分隔
--update-mode <mode> 指定更新策略,包括:updateonly(默认)、
--input-null-string <null-string> 使用指定字符串,替换字符串类型值为null的列
--input-null-non-string <null-string> 使用指定字符串,替换非字符串类型值为null的
--staging-table <staging-table-name> 在数据导出到数据库之前,数据临时存放的表名
--clear-staging-table 清除工作区中临时存放的数据
--batch 使用批量模式导出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
接下来我们来看一下Sqoop的导出功能
从HDFS导出到MySQL,将刚才导入到HDFS中的数据再导出来。
sqoop export \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--table user2 \
--export-dir /out2 \
--input-fields-terminated-by '\t'

注意:这里 --table 指定的表名需要提前创建,sqoop不会自动创建此表。

C:\Users\yehua>mysql -uroot -padmin
mysql> use imooc;
Database changed
mysql> create table user2(id int(10),name varchar(64));
Query OK, 0 rows affected (0.24 sec)
1
2
3
4
5
6
7
8
9
验证结果,查询MySQL中的数据
mysql> select * from user2;
+------+------+
| id | name |
+------+------+
| 3 | mike |
| 2 | tom |
+------+------+
2 rows in set (0.00 sec)
实现插入和更新功能
1
2
3
4
5
6
7
8
在导出的时候可以实现插入和更新功能
如果存在就更新,不存在就插入

注意:此时表中必须有一个主键字段

将user2中的id字段设置为主键,
然后修改user2中id为2那条数据的name字段的值为imooc
删除id为3的那条数据

image-20230404155237504

1
2
3
4
5
6
7
8
修改之后的user2表中的数据如下:
mysql> select * from user2;
+----+-------+
| id | name |
+----+-------+
| 2 | imooc |
+----+-------+
1 row in set (0.00 sec)
1
2
3
4
5
6
7
8
9
10
执行sqoop语句
sqoop export \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--table user2 \
--export-dir /out2 \
--input-fields-terminated-by '\t' \
--update-key id \
--update-mode allowinsert
1
2
3
4
5
6
7
8
9
再验证一下结果,会发现针对已有的数据更新,没有的数据新增。
mysql> select * from user2;
+------+------+
| id | name |
+------+------+
| 3 | mike |
| 2 | tom |
+------+------+
2 rows in set (0.00 sec)
1
2
3
4
5
6
7
8
9
10
11
这就是Sqoop的导入和导出功能。
后期我们在使用Sqoop的时候,建议将Sqoop的命名写到shell脚本中,否则使用起来不方便。
[root@bigdata04 soft]# vi sqoop-ex-user.sh
#!/bin/bash
sqoop export \
--connect jdbc:mysql://192.168.182.1:3306/imooc?serverTimezone=UTC \
--username root \
--password admin \
--table user2 \
--export-dir /out2 \
--input-fields-terminated-by '\t'

数据采集方式

1
2
3
4
5
全量采集(数据量不大,每天都改变:一天采集一次;数据量不大,几十年都不变:只做一次全量采集)

增量采集(数据量大,每天采集新增数据)

hive不能对数据进行修改(比如mysql中的表的订单信息已改变,但hive中不支持修改)->解决:拉链表

image-20230331012207048

1
2
3
4
5
注意:手机号在采集的时候需要脱敏处理,因为数据进入到数据仓库之后会有很多人使用,为保护用户隐私,最好在采集的时候进行脱敏处理。
所以在采集user和user_addr表中的数据时对手机号进行脱敏。

18315138177
183xxxxx177

数据采集脚本开发

1
2
3
下面就开始进行数据采集,其实就是使用Sqoop实现的数据导入
开发一个通用的sqoop数据采集脚本
在bigdata04机器上创建目录 /data/soft/warehouse_shell_good_order ,针对商品订单相关的脚本全部放在这里面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@bigdata04 soft]# mkdir warehouse_shell_good_order

创建脚本 sqoop_collect_data_util.sh

[root@bigdata04 warehouse_shell_good_order]#vi sqoop_collect_data_util.sh
#!/bin/bash
# 采集MySQL中的数据导入到HDFS中
if [ $# != 2 ]
then
echo "参数异常:sqoop_collect_data_util.sh <sql> <hdfs_path>"
exit 100
fi

# 数据SQL
# 例如:select id,name from user where id >1
sql=$1

# 导入到HDFS的路径
hdfs_path=$2

sqoop import \
--connect jdbc:mysql://192.168.182.1:3306/mall?serverTimezone=UTC \
--username root \
--password admin \
--target-dir "${hdfs_path}" \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query "${sql}"' and $CONDITIONS' \
--null-string '\\N' \
--null-non-string '\\N'
1
2
3
注意:如果在windows中使用notepad++开发shell脚本的时候,需要将此参数设置为UNIX。
这个我们之前在讲shell的时候已经讲过了,在这再重复一遍。
否则在windows中开发的脚本直接上传到linux中执行会报错。

image-20230404161003265

开始采集数据

1
2
针对全量数据采集和增量数据采集开发不同的脚本
全量数据采集: collect_data_full.sh
全量采集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[root@bigdata04 warehouse_shell_good_order]#vi collect_data_full.sh
#!/bin/bash
# 全量数据采集
# 每天凌晨执行一次

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi


# SQL语句
user_sql="select user_id,user_name,user_gender,user_birthday,e_mail,concat(left(mobile,3), '****' ,right(mobile,4)) as mobile,register_time,is_blacklist from user where 1=1"
user_extend_sql="select user_id,is_pregnant_woman,is_have_children,is_have_car,phone_brand,phone_cnt,change_phone_cnt,weight,height from user_extend where 1=1"
user_addr_sql="select addr_id,user_id,addr_name,order_flag,user_name,concat(left(mobile,3), '****' ,right(mobile,4)) as mobile from user_addr where 1=1"
goods_info_sql="select goods_id,goods_no,goods_name,curr_price,third_category_id,goods_desc,create_time from goods_info where 1=1"
category_code_sql="select first_category_id,first_category_name,second_category_id,second_catery_name,third_category_id,third_category_name from category_code where 1=1"

# 路径后缀
path_prefix="hdfs://bigdata01:9000/data/ods"

# 输出路径
user_path="${path_prefix}/user/${dt}"
user_extend_path="${path_prefix}/user_extend/${dt}"
user_addr_path="${path_prefix}/user_addr/${dt}"
goods_info_path="${path_prefix}/goods_info/${dt}"
category_code_path="${path_prefix}/category_code/${dt}"

# 采集数据
echo "开始采集..."
echo "采集表:user"
sh sqoop_collect_data_util.sh "${user_sql}" "${user_path}"
echo "采集表:user_extend"
sh sqoop_collect_data_util.sh "${user_extend_sql}" "${user_extend_path}"
echo "采集表:user_addr"
sh sqoop_collect_data_util.sh "${user_addr_sql}" "${user_addr_path}"
echo "采集表:goods_info"
sh sqoop_collect_data_util.sh "${goods_info_sql}" "${goods_info_path}"
echo "采集表:category_code"
sh sqoop_collect_data_util.sh "${category_code_sql}" "${category_code_path}"
echo "结束采集..."
增量采集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
增量数据采集: collect_data_incr.sh

#!/bin/bash
# 增量数据采集
# 每天凌晨执行一次

# 默认获取昨天的日期,也支持传参指定一个日期
if [ "z$1" = "z" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi

# 转换日期格式,20260101 改为 2026-01-01
dt_new=`date +%Y-%m-%d --date="${dt}"`

# SQL语句
user_order_sql="select order_id,order_date,user_id,order_money,order_type,order_status,pay_id,update_time from user_order where order_date >= '${dt_new} 00:00:00' and order_date <= '${dt_new} 23:59:59'"
order_item_sql="select order_id,goods_id,goods_amount,curr_price,create_time from order_item where create_time >= '${dt_new} 00:00:00' and create_time <= '${dt_new} 23:59:59'"
order_delivery_sql="select order_id,addr_id,user_id,carriage_money,create_time from order_delivery where create_time >= '${dt_new} 00:00:00' and create_time <= '${dt_new} 23:59:59'"
payment_flow_sql="select pay_id,order_id,trade_no,pay_money,pay_type,pay_time from payment_flow where pay_time >= '${dt_new} 00:00:00' and pay_time <= '${dt_new} 23:59:59'"

# 路径后缀
path_prefix="hdfs://bigdata01:9000/data/ods"

# 输出路径
user_order_path="${path_prefix}/user_order/${dt}"
order_item_path="${path_prefix}/order_item/${dt}"
order_delivery_path="${path_prefix}/order_delivery/${dt}"
payment_flow_path="${path_prefix}/payment_flow/${dt}"

# 采集数据
echo "开始采集..."
echo "采集表:user_order"
sh sqoop_collect_data_util.sh "${user_order_sql}" "${user_order_path}"
echo "采集表:order_item"
sh sqoop_collect_data_util.sh "${order_item_sql}" "${order_item_path}"
echo "采集表:order_delivery"
sh sqoop_collect_data_util.sh "${order_delivery_sql}" "${order_delivery_path}"
echo "采集表:payment_flow"
sh sqoop_collect_data_util.sh "${payment_flow_sql}" "${payment_flow_path}"
echo "结束采集..."
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
执行脚本
验证结果:

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/ods
Found 10 items
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/category_
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/goods_inf
drwxr-xr-x - root supergroup 0 2026-01-01 10:42 /data/ods/order_de
drwxr-xr-x - root supergroup 0 2026-01-01 10:42 /data/ods/order_it
drwxr-xr-x - root supergroup 0 2026-01-01 10:43 /data/ods/payment_f
drwxr-xr-x - root supergroup 0 2026-01-01 10:29 /data/ods/user
drwxr-xr-x - root supergroup 0 2026-01-01 11:04 /data/ods/user_act
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/user_add
drwxr-xr-x - root supergroup 0 2026-01-01 10:29 /data/ods/user_ext
drwxr-xr-x - root supergroup 0 2026-01-01 10:41 /data/ods/user_ord
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/ods/user_order/20260101
Found 2 items
-rw-r--r-- 2 root supergroup 0 2026-01-01 10:42 /data/ods/user_ord
-rw-r--r-- 2 root supergroup 74618 2026-01-01 10:42 /data/ods/user_ord
1
2
3
4
执行脚本

[root@bigdata04 warehouse_shel_good_order]# sh collect_data_full.sh 20260101
[root@bigdata04 warehouse_shel_good_order]# sh collect_data_incr.sh 20260101
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/ods
Found 10 items
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/category_
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/goods_inf
drwxr-xr-x - root supergroup 0 2026-01-01 10:42 /data/ods/order_de
drwxr-xr-x - root supergroup 0 2026-01-01 10:42 /data/ods/order_it
drwxr-xr-x - root supergroup 0 2026-01-01 10:43 /data/ods/payment_f
drwxr-xr-x - root supergroup 0 2026-01-01 10:29 /data/ods/user
drwxr-xr-x - root supergroup 0 2026-01-01 11:04 /data/ods/user_act
drwxr-xr-x - root supergroup 0 2026-01-01 10:30 /data/ods/user_add
drwxr-xr-x - root supergroup 0 2026-01-01 10:29 /data/ods/user_ext
drwxr-xr-x - root supergroup 0 2026-01-01 10:41 /data/ods/user_ord
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/ods/user_order/20260101
Found 2 items
-rw-r--r-- 2 root supergroup 0 2026-01-01 10:42 /data/ods/user_ord
-rw-r--r-- 2 root supergroup 74618 2026-01-01 10:42 /data/ods/user_ord

本文标题:大数据开发工程师-第十二周 综合项目:电商数据仓库之用户行为数仓3

文章作者:TTYONG

发布时间:2023年03月30日 - 21:03

最后更新:2023年06月18日 - 18:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%BA%8C%E5%91%A8-%E7%BB%BC%E5%90%88%E9%A1%B9%E7%9B%AE-%E7%94%B5%E5%95%86%E6%95%B0%E6%8D%AE%E4%BB%93%E5%BA%93%E4%B9%8B%E7%94%A8%E6%88%B7%E8%A1%8C%E4%B8%BA%E6%95%B0%E4%BB%933.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%