大数据开发工程师-第十八周 直播平台三度关系推荐v1.0-2


第十八周 直播平台三度关系推荐v1.0-2

数据采集架构详细设计

image-20230424234832289

1
2
3
4
5
6
7
大家好,下面呢,我们就从我们整体架构里面的第一个模块数据采集模块开始。注意,在实际过程中,数据采集模块不是只针对某一个项目而言的,而是一个公共的采集平台,所有项目依赖的数据全部都来源于数据采集模块,所以在设计采集模块的时候要考虑通用性。不能仅仅是为了这一个项目而服务。

咱们前面在分析整体架构的时候说过,filebeat采集的数据到达kafka以后,会通过flume再做一下分发,为什么要有这个分发这个过程呢?这个分发过程实现了什么功能呢?我们来看一下这张图。这个图里面呢,针对数据采集模块做了详细的分析,把数据采集模块呢,又划分了三层,数据采集聚合层,数据分发省数据落盘层。

在这个数据采集聚合层,我们为了保证采集程序的通用性,不至于每次新增一个业务指标的数据,就去重新增加一个采集进程,或者修改采集程序的配置文件。所以呢,我们定义了一个规则。所有的日志数据全部保存在服务器的一个特定的目录下面。我会让filebeat的监控这个目录下面的所有文件。如果后期有新增业务日志,那么就会在这个目录下新增一种日志文件,filebeat就可以自动识别。但是这个时候会有一个问题。filebeat的输出只有一个。多种类型的日志数据会被filebeat采集到同一个topic中。如果各种类型的日志数据全部混到一块儿,会导致后期处理数据的时候比较麻烦。本来呢,我只想计算一种数据,但是这个时候我就需要读取这个大的topic。它里面呢,包含了很多种数据类型。这里面的一个数据量也很大,那我计算的时候呢,我就需要把它里面所有数据全部都读出来,然后再过滤。这样在计算的时候就会影响计算效率,也间接的浪费了计算资源。所以针对这个问题,我们又定义了一个规则,所有的日志数据全部使用json格式,并且呢,在json中增加一个type字段,标识数据的类型,这样每一条数据都有自己的类型标识,然后汇聚到kafka中的一个大的topic中。为了后面使用方便,我们就需要把这个大的topic中的数据啊,根据业务类型进行拆分,把不同类型的数据啊分发到不同的topic中。那这块的话,其实就是我们这个数据分发层要干的事情,相当于filebeat呢,它呢会把这个数据啊,全部都采集到kafka里面这个大topic里面。所有业务类型的日全部都采集到这一个topic里面。

然后呢,我们接着就可以使用这个flume,对kafka中这个大topic中的数据进行分发。利用flume中的拦截器解析数据中的那个type字段的值,把type字段的值作为输出topic的一个名称。这样就可以把相同类型的数据分发到同一个topic中了。当然了,这些topic呢,我们需要提前创建。如果想要提高这个数据分发的能力,我们还可以在这儿启动多个flume进程。只需要保证多个flume中指定相同的group.id就可以了。这样就可以并行执行这个数据分发操作。那把这个数据分发到对应的topic里面以后呀,后面的实施计算程序就可以直接消费这个topic进行计算了。不需要再去读取那个大的topic,这样就可以提高计算性能。并且呢,我们还可以把需要备份的topic里面的数据啊,使用flume进行落盘,把它保存到Hdfs里面。这个呢,就是数据采集架构的详细设计。

数据来源分析

image-20230425104401098

服务端日志数据

1
下面我们来分析一下,针对这个项目,我们需要采集哪些业务类型的数据,以及这些数据来源于什么地方。首先是服务端日志数据。什么是服务端日志呢?可以这样理解。就是我们在APP中点击一些按钮的时候,例如我们要关注一个主播,这个时候当我们点击这个关注按钮之后。APP呢,它会去请求对应的接口。接口中的代码逻辑就是将我们关注的数据保存到数据库里面。同时,这个接口也会记录一份日志。因为这个接口呢,是为APP提供后台服务的,所以它记录的日志我们称之为服务端日志。接着我们来简单画一个图来看一下。

image-20230518171257153

1
2
3
这个呢,是隔壁老王。那是我们的一个用户。画的形象一点。这是我们的一个APP。我们这个APP里面呢,它有一个关注功能。这时候呢,你看老王啊,他会点击这个关注功能。要关注某一个主播。当他点击这个关注功能之后,这个APP里面这个关注功能,它对应的它会调用一个接口。那我们这块呢,有一个后台服务器。这个服务器里面呢,部署的有一个接口服务,就是这种HTTP接口。那对应的这个关注功能,其实呢,还会调那个接口。所以你在这呢,点击APP里面这个关注按钮。它底层啊,其实会调这个接口。这个接口呢,其实呢,它会操作一个数据库。相遇老王在这儿呢,关注了一个主播,最终啊调这个接口。接口呢去操作这个数据库,最终把老王关注了,谁把这个数据呢存到数据库里面。就是这个逻辑啊。那这个时候注意我们在这个接入服务里面呢,因为它俩现在也是一个外部项目,所以什么它里面可以记录日志,那这里面记录的日志呢。我们就把它称为是服务端日志。

那在我们这个项目里面,针对服务端日志。主要包含实时粉丝、观众数据以及视频数据。这个实时粉丝关注数据啊,是因为用户呢,在点击关注以及取消关注的时候,都需要调用服务端接口。所以这个数据呢,会在服务端通过日志记录。还有就是这个视频数据。下面这个视频啊,其实就是直播。当主播关闭直播的时候,会调用服务端接口上报本次直播的相关指标数据。其实服务端记录的还有很多其他类型的数据,只不过说呢,我们这个项目目前呢,只需要这两种数据。

服务端数据库数据

image-20230518171613701

1
那接下来我们来看一下服务端数据库中的数据。注意服务端数据库中的数据啊,其实啊,就是我们刚才图里面这个数据库里面的数据。来看一下。就这块儿。就相当于啊,我们App某一些功能呢,它会调用这个后台的接口,然后调用这个接口之后呢,最终啊,其实操作的是这个数据库。我们所说的这个服务端数据库的数据,其实说的就是这块它里面的数据。那在这个项目里面,我们主要获取历史粉丝关注数据,以及呢主播等级数据。这里面我们需要历史粉丝关注数据,因为我们在做这个项目的时候,我们的直播平台已经运营了两三年了。所以说,我们需要把历史粉丝关注数据初始化到图数据库中。这些历史数据服务端存储在数据库中,所以说我们需要从数据库里面去取。还有就是这个主播的等级数据。其实这个数据呢,在服务端日志中也有,但是我们考虑到服务端数据库中的数据是最准确的。特别是针对用户相关的数据,最好是以服务端数据库中的为准。所以说呢,我们就从那个服务端数据库中,每天凌晨啊,定时把昨天等级发生了变化的这个主播等级数据呢,导入到hdfs,方便我们后面的离线计算时。

客户端日志数据

1
2
3
4
5
6
7
8
9
10
11
最后呢,我们来看一下客户端日志数据。刚才我们分析了服务端日志。

那什么是客户端日志呢?其实啊,就是用户在APP客户端操作的时候。直接通过埋点上报的日志数据,这种数据称之为客户端日志数据。我们在这再画一下这个图。如果说呀,你在这做了一些操作,你说呢,我不调用这个后台这个接口。我呢通过买点直接上报用户这个行为说你关注了谁对吧,直接通过买点上报。这个时候的话,在这呢,我们会有一个。这个日志接口服务器。他这个呢,就是直接通过这个埋点上报。那最终啊,可以把这个用户的行为数据啊,直接上报到这个日志接口服务器里面。然后就没有,然后。他不会去操作数据库啊。里面啊,一般上报的都是一些用户的行为。咱们刚才你看服务端日志呢,现在这啊,我们是要调这个接口,最终呢,是要操作数据库的,要把我们这个关注行为。就是谁关注,谁要把这个数据给它保存到数据库里面。

而这种呢,通过埋点上报呢,其实呢,他就不会和那个数据库打交道。所以说这种话一般称之为是客服端日志。是通过客户端直接上报的,不需要和这个服务端这个接口去交互的。这里记录的日志,我们称之为后端日志。

那这个服务端日志和客端日志有什么区别吗?为什么再分成两种日志呢?以及说为什么有的地方我们使用客户端日志,为什么有的地方我们使用服务端日志呢。针对我们这个APP里面啊,它这个关注功能服务端记录的这个日志啊,会更加准确。因为服务端接口里面,它会涉及到对于数据库的一个操作里面会有事务。只有这条数据真正保存成功的时候,才会记录日志,如果说你在操作数据库保存失败了。现在你会回滚,这时候就不需要去记录这个操作日志了。你顶多记录一条失败的日志。但是客户端日志,只要用户在APP里面点击了一次关注功能,他就会上报一次日志。最终啊,这个日志接收服务器啊,就会接收到这个日志,并且呢,把它记录下来。他可能呢,会由于网络等原因啊,导致最终关注失败,但是呢,你这条日志。你是发过来了,并且呢,这块啊也记录下来。所以相对来说,服务端数据的准确性是比这个客户端日志这个准确性高的。

如果说一份数据在服务端日志中和客户端日志中同时都有。那我们肯定要优先选择服务端中的日志数据。一般啊,我们在客户端通过埋点上报的数据啊,都是一些用户行为数据,这些数据啊,就算有一些误差也没有多大影响。它不涉及到一些和数据库交互的一些操作。那在我们这个项目中,针对客户端日志,我们只要获取用户活跃数据。活跃呢表示啊,只要用户每天打开APP就认为用户活跃,用户的这些行为数据呢,会在客户端通过埋点上报管。

那这些呢,就是我们项目中需要的一些基础数据,主要是这五份儿数据。那刚才我们分析到服务端日志,服务端数据库数据,以及客户端日志,那这个这个图里面啊,其实这样这个呢,就是服务端日志数据。这块呢,就是服务端数据库里面的数据。那这块呢,其实就是客户端的一些日志数据。

image-20230518172542961

模拟产生数据

image-20230518174114092

image-20230425104534521

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
前面我们分析了具体需要什么数据,以及数据从哪里来,下面我们来看一下如何通过代码模拟产生这些数据。先看这个图。我们通过执行generate date这个项目中的这五个入口类,可以模拟产生这五种数据。就是咱们前面分析的那五种数据。那在这里注意一下,因为我们不能直接使用企业中的真实数据啊,所以在这里我会根据企业中真实数据的格式去模拟生成。最终的效果是没有区别的。

我们来看一下这些对应的代码。生成服务端数据和客户端数据代码如下:

【服务端日志】实时粉丝关注数据: GenerateRealTimeFollowData
【服务端日志】视频数据:GenerateVideoInfoData
【服务端数据库】历史粉丝关注数据:GenerateHistoryFollowData
【服务端数据库】主播等级数据:GenerateUserLevelData
【客户端日志】用户活跃数据:GenerateUserActiveData
在执行这些代码的时候还是需要使用之前在微信公众号中获取的校验码

注意:在执行这些代码之前,我们需要先把基础环境搞定了
这些代码在执行的时候会调用服务端接口和客户端日志接收服务,以及还会向MySQL中写入数据
所以需要把这两个服务部署起来,以及在MySQL中初始化数据库和对应的表。

所以说我们需要提前把这个两个接口服务,以及mysql中数据库和对的表都给它初始化好,都给它部署起来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
第一步:
对data_collect项目编译打包

部署data_collect
将生成的jar包上传到bigdata04机器的/data/soft/video_recommend/data_collect目录下,如果目录不存在则创建

[root@bigdata04 soft]# mkdir -p /data/soft/video_recommend/data_collect
# 中间省略了上传jar包的过程
[root@bigdata04 soft]# cd video_recommend/data_collect/
[root@bigdata04 data_collect]# ll
total 16932
-rw-r--r--. 1 root root 17336051 Aug 29 2020 data_collect-1.0-SNAPSHOT.jar
[root@bigdata04 data_collect]# nohup java -jar data_collect-1.0-SNAPSHOT.jar &

测试服务是否正常
注意:这个服务监听的端口是8080
[root@bigdata04 data_collect]# jps -ml
1598 data_collect-1.0-SNAPSHOT.jar
[root@bigdata04 data_collect]# curl -XGET 'http://localhost:8080/v1/t1?name=test'
{"status":200,"msg":"success"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
第二步:
对server_inter项目编译打包

部署server_inter
将生成的jar包上传到bigdata04机器的/data/soft/video_recommend/server_inter目录下,如果目录不存在则创建

[root@bigdata04 data_collect]# mkdir -p /data/soft/video_recommend/server_inter
# 中间省略了上传jar包的过程
[root@bigdata04 data_collect]# cd ../server_inter/
[root@bigdata04 server_inter]# ll
total 16932
-rw-r--r--. 1 root root 17336083 Aug 29 2020 server_inter-1.0-SNAPSHOT.jar
[root@bigdata04 server_inter]# nohup java -jar server_inter-1.0-SNAPSHOT.jar &

测试服务是否正常

注意:这个服务监听的端口是8081
[root@bigdata04 server_inter]# jps -ml
1673 server_inter-1.0-SNAPSHOT.jar
1598 data_collect-1.0-SNAPSHOT.jar
[root@bigdata04 server_inter]# curl -XGET 'http://localhost:8081/s1/t1?name=test'
{"status":200,"msg":"success"}
1
2
第三步:
初始化数据库脚本

image-20230518174258258

1
2
3
4
5
6
7
8
接下来执行代码,开始模拟产生数据
1:【服务端日志】实时粉丝关注数据: GenerateRealTimeFollowData
注意:修改服务端接口地址,我是在bigdata04机器上部署的
代码执行之后,可以看到服务端记录的日志数据
[root@bigdata04 ~]# cd /data/log/
[root@bigdata04 log]# more server_inter.log
{"followuid":"2002","followeruid":"2001","type":"user_follow","times
tamp":1598684303487,"desc":"unfollow"}
1
2
3
4
5
2:【服务端日志】视频数据:GenerateVideoInfoData
注意:修改服务端接口地址,我是在bigdata04机器上部署的
代码执行之后,可以看到服务端记录的日志数据
[root@bigdata04 log]# tail -1 server_inter.log
{"area":"A_US","watchnumpv":364,"follower":364,"hosts":364,"watchnumuv":364,"gifter":364,"nofollower":364,"length":464,"rating":"A","smlook":364,"type":"video_info","gold":364,"uid":"2009","nickname":"jack38","looktime":364,"id":"1769913940296","exp":364,"timestamp":1769913940000}
1
2
3
4
3:【服务端数据库】历史粉丝关注数据:GenerateHistoryFollowData
注意:执行这个代码的时候需要注意修改项目中的db.properties中数据库的地址信息
代码执行之后,可以到数据库中看到数据
查看数据库中的follower_00这个表,发现里面有数据了
1
2
3
4
4:【服务端数据库】主播等级数据:GenerateUserLevelData
注意:执行这个代码的时候需要注意修改项目中的db.properties中数据库的地址信息
代码执行之后,可以到数据库中看到数据
查看数据库中的cl_level_user这个表,发现里面有数据了
1
2
3
4
5
5:【客户端日志】用户活跃数据:GenerateUserActiveData
注意:修改客户端日志接收服务器地址,我是在bigdata04机器上部署的
代码执行之后,可以看到客户端埋点上报的日志数据
[root@bigdata04 log]# head -1 data_collect.log
{"uid":"1000","ver":"3.6.41","countryCode":"VN","ip":"171.247.0.154","UnixtimeStamp":"1598587868773","mcc":"452","type":"user_active"}
1
至此,项目中需要的数据都可以正常产生了。
1
那接下来第一步我们先对这个data collect这个项目给他呢编译打包。嗯。CDDB。video。recommend。C的这个data collect。肯定package。港地skip test。好,编译成功。那把它上传到我们这个服务器上面,注意在这呢,我统一都上传到我们这个BD的零四这台机器上面。那接着呢,我统一再建一个目录。每个点啊。V6。recommend。所以说呢,针对这个项目相关的一些架包啊,那些东西全部都放到这个目录里面,这样方便管理维护啊。那我们进到这个目录下面,注意现在里面呢,我在创建一个目录。叫data collect。那这里面呢,就放我们这个data这个接口,这个夹包。打开这个图形化界面。把我们刚才生成那个架包给他传上去。it soft。在这。把这个家包传上来。好,那下面呢,我们把它启动起来。Java杠架。后面呢,直接指定那个另一个加包就行了,注意我们需要把它放到后台运行,所以说呢,前面加个no ho。后面呢,加个and符,这样就可以了。GPS一下。看到没有,这个架已经启动了,你可以这样GPS杠ML。这样看起来更加清晰,对吧,就这个data collect。那下面呢,我们来测试一下这个服务啊是否正常,因为它是一个接口服务。这里面呢,我们其实开放的有一个测试的一个接口。看到没有?你直接调用这个就行,它是一个测试结果,当然你在这需要传一个参数啊,它需要提出一个参数name。他会把它打印出来,这样可以确认一下你这个服务是否正常啊。这个怎么验证呢?它是一个HTTP请求,你可以选择在我们的浏览器里面去操作,或者说呢,我们直接在这个控制台里面也是可以的,通过curl这个命令杠X。盖着操作啊。HTTP冒号双斜线logo。冒号,注意它这个端口呢,是8080。我在这指定了。看到没有8080啊。VT对吧。name。等于test吧。你看状态是200是OK的啊,说明我那个服务呢是正常启动了。那接下来第二步,我们把这个server这个项目呢,给它编译打包。有这个项目。他是负责写收我们那个服务日志的。嗯。clean港。开始。好,编译成功。把它传上来。注意,那我们在这需要创建一个目录啊。你可以在这直接对吧,右键这样创建也是可以的啊都可以。serve。因此。嗯。好,上传成功。那这个呢,我们要把它启动一下。no Hu Java刚加。安福。嗯。先确认这个服务是不是在啊在吗。好,这个呢,我们也来验证一下。大家注意它这个接口名称呢,就不是这个,你可以到这儿来确认一下。这是S1T1对吧,以及呢,它的端口注意。因为我们是在同一台服务器上启动的,所以说呢,这两个项目啊,它监听的端口肯定是不能重复的啊,这个我给它改成80812。四上这个是8081。后面的是S1T1对吧。嗯。好,OK啊,还是这个size。好,那这样的话,这两个接口服务呢,就搞定了,这个以这个都部署好了。那接着第三步,我们需要把这个MY面这个数据库啊,还有表啊,给它触发好,这个呢,给大家提供了有这个触发脚本也比较方便啊。现在这右键。进行一个so文件。直接把这个脚本。指另外就行了。就是in my circle tables啊。这个脚本。好,执行成功。在这刷新一下。好,它呢,会产生这么些表啊。OK,这个后面我们具体用的时候再来具体查看。那接下来呢,我们就需要去执行这些代码了。这样的话呢,就可以模拟产生数据了。首先呢,我们先产生这个实时粉丝关注数据。就这个服务端呢。啊,服务端日志。模拟生成实时粉丝关注和取消关注数据。注意了,大家在下面呀,在执行我这个代码的时候,需要注意,你们需要改点东西。注意在上面,你看这个是调用我线上这个接口来获取这个模拟数据,获取到之后呢,注意。下面呢,会调用我们本地刚才部署的那个接口服务。因为这个呢是服务端日志,所以说它会调那个服务端接口。我呢是在BD的零四这台机箱部署的。对吧,这个服装接口,它这个对应的端口号是8081。对吧。后面这些东西呢,都不用改,你们下去改的话,主要改一下这个对吧,你在哪一台机器上去部署的这个什么server这个接入服务,那你在这就把那个IP或者主机名写到这就可以了。所以接着的话,就可以模拟产生这个服务端的这个调用请求,最终呢,去调用这个服务端这个接口,让这个接口呢记录日志。那你说这个接口,它具体把日志记录到哪个目录下面呢?注意,在这也能看懂啊。在resources这个目录下面有一个log back点查明。看到没有,我把这个日呢记到这个Di这个母下面了。然后他这块呢,具体的一个日志文件名称是server下换in加log。那对应的这个对collect看了也有。它日志呢,也是放到这个电log这个目录下面。然后它的日文名称呢,叫data collect。所以说这个到时候可以通过这个文件名就知道到底是哪个接口服务记录的日志啊,也好分析。那我们这个代码呢,都是OK的,以及这些接口这些东西呢,我们也不需要改,对吧,也都是OK的,下面呢,我们来执行一下。注意这个代码呢,它是实时产生数据,现在呢,你在这执行一次,它会产生一条数据,就是模拟那个实时产生啊。看到没有接口调用成功,注意这块是我在这记录的日志啊。借口立项成功,它最终产生的日是这个。好,那我们来看一下,我们具体去调这个服务端接口,这个服务端接口有没有把这个日给记录下来了,我们来确认一下。在这重新克隆一个绘画。对,log。到这个木下面。看到没有?这个里面其实已经有值了,对吧,我们可以在这摸看一下啊。没问题吧,他已经记录下来了,说明我们这个流程目前是通的啊。这个呢,就是服务端的实时粉丝关注数据。那接下来往下面看。这个generate video for。这个也是服务端日志对吧,是模拟生成视频相关的数据。就是你这个主播开播结束之后呢,其实它就会调用服务端接口。把当前这个直播的相关的一些信息上报过去啊。那注意你这个在用的时候也是一样的流程,对吧,这个上面还是就我那个接口。那下面的话呢,你需要在这对应改一下对吧。好,那我在这来执行一下。注意这个呢,一次性我可以采用多条数据啊。你看接口交换成功是OK的。那下面注意我们来确认一下数据。因为这时候它里面产生数据比较多,我就使用T杠一吧,查看里面最新的一条数据。对吧。也怪了。OK,所以说这个呢,是服务端的这两种类型的数据都采集过来了。往下面看。那接着呢,我们来看一下这个服务端数据库里面的数据。首先呢,是这个。就是这个历史,粉丝关注数据。注意。这个电板执行之后呢,它会把这个数据啊,初始化到我们那个数据库里面啊。注意,你在执行这个代码之前,你需要先执行我这个脚本。进行这个数据库和表的一个初始化。初始化之后的话就可以直接执行了,这里面别的不需要改动了,注意你要改点东西。看到没有,要改一下数据库啊。你的一个数据库在哪,把那个IP地址啊,以数据库啊,一个名称啊,对吧,用户名啊,密码啊对吧,这些东西都改一下就可以了。来,我们来执行一下。从那个里面你可以看出来,他最终把数据啊,写到这个FOLLOW00这个表里面。好,我们到这来看一下。好,数据呢,都写进来了。这些测数据啊。这个呢,其实就是历史的一个粉丝关注出去了。那接下来注意我们来看这个主播等级的。也是这个服务端数据库里面数据模拟生成主播等级数据啊。它也是操作数据库。那这里面的话,他会把数据写到这个level user这个表里面来执行一下。OK,我们来确认一下。好,数据都写进了。那这样的话,相当于服务端数据库里面这两份数据呢,我们也都给他触发好了。那其实还剩一份数据,就是这个客端数据,就是那个用户活跃的。gena use active。对吧,客端是模拟生成用户活跃数据。这个呢,还是通过调用我这个接口获取数据。那下面注意这时候呢,它会调用我们那个客户端的那个日志收集服务器,对吧,其实就是一个client。他们最终会调用他,把数据上报给他。所以你的用处呢,也要改成这个,看一下你这个data client,你部署到哪个机器,把这个改一下就可以了。来执行一下。好,这个data可爱呢,他接收到用户上报的这些数据之后呢,他呢也会把它记录日志记录到本地。嗯。嗯。在这他又起到这个did collect点里面。我们取头一条还杠一。看到没有,就这个这个类型是user active,看到没有,都是接格式的啊。那到此为止呢,项目中需要的数据呢,我们都可以正常产生了。

客服端日志接收服务

image-20230425110236188

image-20230425105526920

服务端http接口

image-20230425110135498

image-20230425110105056

image-20230425110444470

实时粉丝关注

image-20230425111454589

视频数据

image-20230425111654573

历史粉丝关注

image-20230425112531665

image-20230425112548087

历史主播等级

image-20230425113019519

客户端用户活跃日志

image-20230425113331494

image-20230425113349659

数据采集聚合

image-20230425115912855

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
咱们前面把数据产生好了,下面就可以进行数据采集了,首先呢是使用这个filebeat呢,将所有日志数据采集到kafka的一个topic中中,把这个日志数据采集到kafka里面。日志数据有三份,服务端有两份,客户端日志有一份。注意,在这具体执行之前,我们需要先把这个zookeeper以及kafka这两个服务给它启动起来。

启动zk
[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata02 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@bigdata03 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

启动kafka
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
[root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties


好,那接下来呢,我们需要在kafka中呢,去创建一些topic。我们主要创建哪些topic呢?就根据我们前面啊,在那个架构图里面分析的,首先第一个是一个大的topic,它里面呢,包含所有的这种日数据。all_type_data_r2p40,因为它是一个大topic里面数据量比较大,所以说呢,我们给它多分一些分区,后期的话呢,就可以提高你一个消费能力了。这个里面呢,存储所有采集过来的日志数据。
我们后面其实还有一个数据分发层,分发层的话,相当于把这里面的数据啊,这个它分发出来。我们服务端有两种数据,一个呢是这个实时的一个粉丝的关注和取消关注数据。
这个它具体那个类型的名称呢,叫user_follow。这个topic这个名称后面为什么不加这种后缀了呀。这是因为啊,这个数据它相当于是我们的原始json数据里面的某一个自动的值,就那个type自动的值,这个日志啊,是之前记录下来的,所以后期我们再用的话,就把它直接拿过来用就行了,就不要再去改它这个值了,改起来就比较麻烦了。并且呢,你这个对应起来也不太好对应,所以说呢,我们就使用那个原始那个数据里面那个type值作为这个topic贝这个名称。

那下面还有一个video_info。这个里面存储视频信息

还有一个客户端的。user_active就是用户的活跃数据。存储客户端上报的用户活跃数据。
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 40 --replication-factor 2 --topic all_type_data_r2p40
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic user_follow
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic video_info
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic user_active
[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic default_r2p5

注意后面还有一个default_r2p5,所以这个topic是干什么呢?注意现在啊,我们要把这个里面的数据给它分发到这几个topic里面。那在分发的时候有可能有一些数据啊,它里面没有type字段呀,或者说那个type字段的值不是这三个里面的。也就是说那个数据异常,这样的话,你需要把那个异常数据放到这个topic。这里面的存储无法具体分配到对应topic的异常数据。

所以说呢,我们下面就需要去创建这些topic。

image-20230425120032836

1
2
3
4
5
6
7
8
那接下来我们呢,就可以使用filebeat的去采集数据了,注意现在filebeat呢,我们还没有安装呢。需要安装到哪些机器上面呢?注意你要看你之前那个客户端的那个接口,以及服务端的接口,你都部署到哪些服务器上面,我在这呢,都给它部署到这个bigdata04上面了。因为filebeat呢,要采集data_collect这个接口记录的日志,以及server_inter这个接口记录的日志,所以说呢,你后期这两个接口你部署到哪台机器上面,那么你在对应的机器上就需要部署这个filebeat去采集数据。

解压一下
注意这个filebeat就是一个采集工具。所以说呢,使用起来很简单,我们只需要在里面指定输入和输出就可以。我们先修改它的配置文件,在这个目录下面有一个这个filebeat.yml这个配置文件

注意它是一个YML格式的,不是那种XML这种格式的啊,这是种新型的那种配置文件格式。

你可以在这配置它的输入。你看它这有一个什么呀,这个类型是日志的,但是目前这个呢没有开启。我们在这呢,想先测试一下。在测试的时候呢,我就在这个配置文件里面,把这个输入啊,给它指定成标准的一个输入,就是键盘输入,输出的话呢,就把它指定成控制台。这样分析起来好分析,也比较直观。那怎么加呢?注意其实我们在这里面需要加一个输入,就是配置一个输入。对吧,你就按照它这个格式去写就行了,冒号注意后面一个空格,注意这个空格是不能少的。找那个输出。没有output。输出注意你看它默认呢,现在开启了一个什么呀,这个输出呢,是把数据输出的这个elasticserch,那现在我们先不用这个,你把它注掉。就指定那个console,注意下面呢,我们来加一些参数,注意下面这个缩进,我能不能使用那个tab,不行,必须使用空格。你可以使用两个或者四个。pretty。注意这个表示什么意思啊,它表示啊,会把你这个输出的结果啊,给你格式化一下,要不然是非常乱的啊。大家后期啊,在改这个配置文件的时候一定要注意啊,首先是这个冒号后面这个空格不能少,还有呢,就是说你这个在做缩进的时候不能使用制表符,要使用空格,官方建议使用四个,你用两个啊三个啊都可以啊。那下面呢,我们需要把这个配置好的这个配置文件呢,给它上传上去行吗?保存一下。如果说你对这个文件熟悉了之后,你可以直接在这里面去改都是可以的。不熟悉的话,建议呢,你还是把这个配送件拿到本地去,改完之后呢再传上来。直接覆盖就行啊。那这样的话就可以啊,下面我们就可以启动filebeat

image-20230425120937212

image-20230425121132023

image-20230425121239236

1
[root@bigdata04 filebeat-7.4.2-linux-x86_64]# ./filebeat -c filebeat.yml

image-20230425121824126

1
2
3
输出了一堆东西。虽然是一堆啊,还是有格式的,是一个阶层格式的,注意这个呢,就是因为我们在这加了这个属性,等于true,它呢会对你返回的数据呢,你做一个格式化。反馈的数据,你看就这个message里面内容,其他内容呢,都是它里面默认加的啊。接着你想停掉的话,直接CTRLC对吧。

接下来我们需要继续修改配置文件,因为我们是希望filebeat的可以采集日志文件中的数据,将数据呢采集到kafka中。把刚才我们那个配置啊都给它删掉。然后呢,我们把这个基于日志的这个类型啊,给它开启了。这样的话它就可以读取文件了,你看path里面指定一些路径,注意你可以指定一个或者多个都可以。你指定一个的话,你可以这样,你指定一个目录下面可以使用那种通配符也是OK的啊。所以说的话,它可以监控多个目录里面的多个文件。这是输入就配置好了

image-20230425122235895

1
2
3
4
5
6
7
8
9
10
接下来我们需要配置输出。这是一个输出组件,注意在这我们需要指定那个kafka的一个输出组件,那在这都需要指定什么配置呢?这个信息呢,到他那个官网上面是可以找到的啊,所以在边我就直接拿过来,你看output.kafka,把数据呢,输入到卡夫卡里面,直行卡卡的一个博物块地址对吧。123。还有这个topic,我们要把这个数据呢写到这里面。下面这几个配置你不用改就行了,这个呢表示啊,它往这个topic里面这些分区写数据的一个规则。这个表示卡里面那个艾机制是吧。这个呢是就是往里面写数据的时候,对这个数据15度压缩,这块呢,使用那个机会压缩可以提高性能。这个呢,表示呢,每一条数据最大的一个字节数量。好这样就可以了,你们下用的时候呢,有可能要改这以及这对吧,如果说你都和我的一样的话,其实都不用改。OK,这样的话就可以了,下面呢,把这个配置文件重新再传一下。

这样就可以实现将服务端日志和客户端日志全部都采集到kafka中的all_type_data_r2p40这个topic中了。

注意:filebeat需要部署在所有服务端接口机器和客户端日志收集机器上,

在实际工作中,服务端接口机器会有多个,我们当时的服务端接口机器有100多台,客户端日志收集机器是有6台,在部署的时候是通过运维同学开发的部署工具批量部署的,要不然一个一个部署会疯的。
在这由于服务端接口和客户单日志收集服务都在bigdata04上面,所以我就只需要在bigdata04上部署一套即可。

在这里我们暂时先不启动filebeat进程,等我们把采集模块中所有的配置全部修改好了以后,从后面挨个开始启动进程,这样不会漏数据。

image-20230425122506901

数据分发

1
下面呢,我们来看一下数据分发层,就是使用实现对采集到kafka指定topic里面的数据啊进行分发。咱们前面呢,把这个日数据采集过来啊,全部都放到kafka一个大的topic里面了,所以说下面呢,我们需要使用flume对这个大的topic里面数据啊进行分发,分发到一些小的topic里面,这样可以方便使用。注意,那这样的话,我们就需要在这个flume里面增加一配置文件,就是从那个大的topic里面消费数据,通过拦截器获取数据中的一个type自段的值作为输出kafka的一个topic的名称。这个配置文件呢,在这儿我已经写好了,我们来直接看一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#source+channle+sink的名字
agent.sources = kafkaSource
agent.channels = fileChannl
agent.sinks = kafkaSink

# 指定source使用的channel
agent.sources.kafkaSource.channels = fileChannl
# 指定sink需要使用的channel的
agent.sinks.kafkaSink.channel = fileChannl


#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.batchSize = 1000
agent.sources.kafkaSource.batchDurationMillis = 1000
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sources.kafkaSource.kafka.topics = all_type_data_r2p40
agent.sources.kafkaSource.kafka.consumer.group.id = flume_con_id_1

#----------------- 拦截器 -------------------
# 定义拦截器
agent.sources.kafkaSource.interceptors = i2 i1
# 设置拦截器类型
agent.sources.kafkaSource.interceptors.i1.type = regex_extractor
# 设置正则表达式,匹配指定的数据,这样设置会在数据的header中增加topic=aaa
agent.sources.kafkaSource.interceptors.i1.regex = "type":"(\\w+)"
agent.sources.kafkaSource.interceptors.i1.serializers = s1
agent.sources.kafkaSource.interceptors.i1.serializers.s1.name = topic
# 避免遇到数据中没有type字段的,给这些数据赋一个默认topic【注意:这个拦截器必须设置】
agent.sources.kafkaSource.interceptors.i2.type = static
agent.sources.kafkaSource.interceptors.i2.key = topic
agent.sources.kafkaSource.interceptors.i2.preserveExisting = false
agent.sources.kafkaSource.interceptors.i2.value = default_r2p5


#------- fileChannel相关配置-------------------------
# channel类型
agent.channels.fileChannl.type = file
agent.channels.fileChannl.checkpointDir = /data/filechannle_data/all_type_data/checkpoint
agent.channels.kafka2HdfsShow.dataDirs = /data/filechannle_data/all_type_data/data


#---------kafkaSink 相关配置------------------
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.topic = default
agent.sinks.kafkaSink.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
agent.sinks.kafkaSink.kafka.flumeBatchSize = 1
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = snappy
1
那接下来就把它呢传上去就可以了,注意如果说我们需要在一台机器中启动多个flume进程的时候,最好在里面复制多个conf目录,因为如果在一个conf目录中启动多个agent的进程的话,多个agent的进程的日志信息会混到一块,后期排查问题会很麻烦啊,这个我们之前呢讲过了对吧啊,所以说呢,在这。进到这个。都门面。我们来复制一个。打靠谱。告不告?卡不卡?高不高相以我们从卡夫卡里面读出去,再把数据写到卡夫卡里面。进到里面。改一下他这个log的这个配置文件。把那个改一下,搞搞不搞搞卡不卡。好,这样的话就可以了。嗯。OK,那接下来我们在这里面来创建这个配置文件,就是刚才我们分析的这个。敢不敢杠后杠?敢不敢点。com?嗯。我们就组织一下。好,这样就可以了,注意这块搞定之后呢,我们就需要往下面走了,在这啊,我们也先不启动啊,那我们把这个数据落盘这块也搞定之后呢,从后往前启动。

image-20230425214444152

image-20230425214458087

数据落盘

image-20230425215638800

1
2
3
接下来我们来看一下这个数据落盘层。我们需要使用采集指定topic各种数据进行落盘,便于离线计算。你看咱们前面呢,把这个数据呢,全部都采集到kafka这个大的topic里面,接下来做了一个分发,分发之后后面我们就要做这个落盘了。把我们需要落盘的数据呢,给它呢,落到这个HDFS上面,注意如果这里面这个大topic里面的所有类型的数据呢,我们都需要落盘的话,我们就可以直接读取这个大topic里面的所有数据,然后呢,使用这个拦截器。根据数据类型把它们呢保存到hdfs的多个目录中,这样呢比较方便,不过在我们这个项目中,这个大的topic里面一共有三种类型的数据,其中呢,有两种数据我们是需要进行落盘后期进行离线计算的。有一种数据呢是不需要的,只有实时计算上讲会用的,所以说我们可以使用两个flume agent来对我们需要的数据执行落盘操作。当然了,我们也可以啊,只使用一个flume agent的,那你这里面啊,也可以使用拦截器,只获取我们需要落盘的那两种数据,这样也是可以的,这个呢,给大家留一个作业,大家下去自己研究一下。使用flume拦截器如何呢?获取我们需要的两种数据,然后呢,分别把它呢落盘到hdfs的不同目录下面。

那我在这呢,就使用两个agent来实现了,我们需要落盘的这两个topic呀,分别是这个user_active和这个video_info。所以说呢,在这我还需要复制两个目录。

image-20230425221141206

1
2
3
那我们进去去改一下那个logo的配置。那接下来我们还需要在这两个目录里面创建对应的一个配置文件,这两个配置文件啊,我也提前写好了,我们来看一下。

因为这个东西啊,它就是一个体力活。咱们前面用过很多次了啊,没有什么技术含量。首先看这个user active,这都没什么好处了。S呢,是一个卡夫卡S,你读取这个user active。那个group使用这个be China。注意,下面是这个H里边的think。按T分目录存储就行了,放到对目录下面一个user。这就可以。那这个呢,是一个video。对吧,它读取的是一个video in这个topic。然后呢,就是说放到这个data木下面有个video。对吧,这是那个年月日。看天分,母乳好。那我在这呢,把这个复制一下。这是一个user active。脸卡不卡?到as。到。user。active。com。好,这个搞定。下一个先进到这个目录里面这个音符,把这个呢复制一下。VI、卡夫卡杠杠、video info。好。好,那这两个呢,我们都配置好了。所以接下来我们需要先确认一下这个含毒不集群啊,是否启动。对吧,我们之前已经起来了啊。好,那到此为止啊,这个采集相关的这个配置呢,我们都修改好了,下面呢,我们就要来启动一下。我们要从后往前启动,所以说呢,我们先启动这个数据落盘的这个辅助。我们来启动一下。诺哈普。b my name。agent康复指定配置文件跟目录。这个是com user active先求这个。进行配置文件刚刚。靠谱。在康复,然后呢。有点active下面有一个。啊,不搞搞HS user active com这个文件。agent。好,接下来呢,是第二个,我把前面这些能敷的呢复制过来。这个呢是video。刚刚看。六。下面有一个卡杠,杠HS杠video杠infor.com。最后是这个大纲内。agents。嗯。确认一下。这是一个,这是一个对吧,这两个呢都起来,嗯。这就是这两个数据落盘的一个a的进程啊,那接下来再往回推,我们需要把那个数据分发那个给它起下来。说。NG到康复。到卡夫卡,到卡夫卡。刚刚康复杠。这个木下面有一个。卡夫卡,卡夫卡点。com。我们之前少写一个这个F的这个强迫症,我给他改一下啊。来确认一下,确实少一个F,这个也不影响啊,只不过看起来有点别扭。好。这就可以了。怎么呢,重新来启动的啊,刚才也没启动成功啊no。b agent。等到。卡夫卡,卡夫卡,然后呢,刚刚。哎。卡夫卡,卡夫卡下面有一个卡夫卡,卡夫卡加。祷告,name isn'。好。可以确认一下啊。对吧,这个也启动好。那最后呢,我们就要启动那个Bob的采集程了。接着来启动。牛逼了,杠C。比点两秒。注意啊,针对这个feel进程啊,正式启动的时候也需要使用这个no ho,把它放到后台运行在这里,我们为了一会儿使用方便,所以说我先在前台启动了,对吧。我们按一个回车把它启动起来,那接下来呢,我们就可以启动生成数据的程序了。注意在这呢,我们先执行那个generate real time for这个实时生成那个粉丝关注和取消关注的数据啊。来执行一下。

kafka-hdfs-user-active.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#source+channel+sink的名字
agent.sources = kafkaSource
agent.channels = kafka2Hdfs
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = kafka2Hdfs
# 指定sink需要使用的channel的名字
agent.sinks.hdfsSink.channel = kafka2Hdfs


#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
# 配置消费的kafka topic
agent.sources.kafkaSource.kafka.topics = user_active
# 配置消费者组的id
agent.sources.kafkaSource.kafka.consumer.group.id = user_active_con_1

#------- fileChannel-1相关配置-------------------------
# channel类型
agent.channels.kafka2Hdfs.type = file
agent.channels.kafka2Hdfs.checkpointDir = /data/filechannle_data/user_active/checkpoint
agent.channels.kafka2Hdfs.dataDirs = /data/filechannle_data/user_active/data

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
agent.sinks.hdfsSink.hdfs.path = hdfs://bigdata01:9000/data/user_active/%Y%m%d
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 3600000

#当文件大小为104857600字节时,将临时文件滚动成一个目标文件
agent.sinks.hdfsSink.hdfs.rollSize = 104857600
#events数据达到该数量的时候,将临时文件滚动成目标文件
agent.sinks.hdfsSink.hdfs.rollCount = 0
#每隔N s将临时文件滚动成一个目标文件
agent.sinks.hdfsSink.hdfs.rollInterval = 3600

#配置前缀和后缀
agent.sinks.hdfsSink.hdfs.filePrefix=run
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

kafka-hdfs-video-info.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#source+channel+sink的名字
agent.sources = kafkaSource
agent.channels = kafka2Hdfs
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = kafka2Hdfs
# 指定sink需要使用的channel的名字
agent.sinks.hdfsSink.channel = kafka2Hdfs


#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
agent.sources.kafkaSource.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
# 配置消费的kafka topic
agent.sources.kafkaSource.kafka.topics = video_info
# 配置消费者组的id
agent.sources.kafkaSource.kafka.consumer.group.id = video_info_con_1

#------- fileChannel-1相关配置-------------------------
# channel类型
agent.channels.kafka2Hdfs.type = file
agent.channels.kafka2Hdfs.checkpointDir = /data/filechannle_data/video_info/checkpoint
agent.channels.kafka2Hdfs.dataDirs = /data/filechannle_data/video_info/data

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
agent.sinks.hdfsSink.hdfs.path = hdfs://bigdata01:9000/data/video_info/%Y%m%d
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 3600000

#当文件大小为104857600字节时,将临时文件滚动成一个目标文件
agent.sinks.hdfsSink.hdfs.rollSize = 104857600
#events数据达到该数量的时候,将临时文件滚动成目标文件
agent.sinks.hdfsSink.hdfs.rollCount = 0
#每隔N s将临时文件滚动成一个目标文件
agent.sinks.hdfsSink.hdfs.rollInterval = 3600

#配置前缀和后缀
agent.sinks.hdfsSink.hdfs.filePrefix=run
agent.sinks.hdfsSink.hdfs.fileSuffix=.data

启动

1
然后将hadoop集群启动起来,因为数据落盘会使用到HDFS

数据落盘

1
2
3
好 到这为止,采集需要的配置都修改好了
下面我们就来启动一下
1:先启动数据落盘的flume

image-20230425221947341

image-20230425222021013

数据分发

1
2:再启动数据分发的flume

image-20230425222459145

数据采集聚合

1
2
3
3:最后启动filebeat采集进程

注意:针对filebeat进程,正式启动的时候也需要使用nohup 放在后台运行,在这里我们为了一会使用方便,所以先在前台启动。

image-20230425222700964

执行数据生成程序

1
2
3
4
5
6
4:接下来启动生成数据的程序
先执行GenerateRealTimeFollowData
验证效果
查看all_type_data_r2p40这个topic中的数据

验证

image-20230425223043250

image-20230425223117407

1
所以执行成功了,那我们来验证一下结果。我们到那个卡夫卡里面来消费一下。基于console的。注意这个内容还是比较多的,但是我们可以大致看一下啊,你看。这是一条数据,是不是很乱呀?我们的日志数据没有这么乱吧,并且我们的日志数据里面也没有这个东西。对吧,我们的日志数据里面其实是这些东西在这,你看它其实啊,其实在这面又封装了一层,他把我这个具体的业务数据啊,给封装到这个message字段里面。注意这些字段的话,相对来讲是filebeat默认生成的,那这些字段呢,它不是我们需要的,我们希望啊,只记录我们的原始日志即可。那怎么解决这个问题呢?可以解决啊,我们需要在那个filebeat那个配置文件里面加一个配置。需要在filebeat中的output.kafka里面增加codec.format配置。在这相当于啊,你要对它输出这个数据啊,做一个格式化。

image-20230425223504841

1
CD format。做一个格式化。四六类形的。对。我们呢,只取它里面的这个东西,哎,问号首先呢,括括号中括号我们只取里面那个message。所以这个配置的意思呢,就是说我们从这个数据里面。你看它这个采集过来数据里面有很多字段,我们只需要message字段里面内容,其实这里面就是我们的原始的日志数据。然后把这个filebeat再停一下。你再来启动,我们再来执行这个生成数据的这个。看到没有?这个就正常了,这样就可以

image-20230425223717162

1
那接下来我们再来查看一下这个user_follow这个topic,这个看一看它里面能不能消费到数据,如果能消费到数据,就说明那个flume的数据分发过程是没有问题的啊

image-20230425224013711

1
收到了吧。注意你在这能收到就说明啊,我们中间那个flume的分发程序是OK的,他从那个大的topic里面把这个数据读出来,然后呢写到这里,这样就可以好。那接下来我们再来执行两个程序。一个是这个active。其实呢,我们之前已经执行过了,在这我们再重新执行一下啊。然后重新呢,再生成一批数据,就是往那个日志文件里面再写一批啊,因为你调用接口,这个接口就会记录了,以及这个video info。好,这个执行成功之后呀,注意我们就可以到那个HDPS里面去确认一下结果数据了。因为你这个执行之后啊,他呢,这个接口就会把那个日志记录到本地。这个接口呢,就会把那个日志数据啊,记录到这个本地的日文件里面,然后filebeat呢,发现你里面啊有新增数据,所以说他就把这个数据给读出来,读出来之后呢,把它呢,采集到那个大的topic,就是all type data那个topic里面。然后呢,这里面有数据之后,后面那个的数据分发程序,它就会读取这个数据,对吧,对这个数据做分发,然后呢,把这个数据分发到不同的那个子topic里面,那我们最后呢,还有一个flume数据落盘层。他呢,就会从那个子topic里面把那些数据呢读出来,最终呢落到hdfs
1
所以下面我们来验证一下。看到没有,这个user active,对这个都是有的。好,这个目录来我们直接查看一下它里面的数据啊。使用管道来取,前一条取一条就行,好里面有数据。那说明是OK的,那其实对应的我可以直接把这个改一下,改成video。看一下他们的数据。也是有了。没有问题,好。那在这我们都可以获取到数据,那就说明了是没有问题的,这就意味着我们前面的整个数据采集流程是通的。那大家在下面做实验的时候呀,我估计啊,可能会遇到各种各样的问题啊,就是大家在操作这一块的时候,如果发现最终啊,看不到我们这个希望的结果。那你就需要一步一步去排查,你要确认一个数据到了哪一步。你先看一下那个大套贝里面有没有数据,然后呢,再看一下那个子套贝壳里面有没有数,如果子套贝格里面也有数据,那最终hps里面没数据,那肯定是你那个数据落盘程序有问题。所以说你需要一点点去分析啊。

image-20230425230808125

image-20230425230907753

image-20230425230920558

1
针对这个filebeat呀,我再多说一点,这个filebeat它在采集日文件中的数据的时候呢,它会将日文件数据的采集的一个偏移量啊,记录到本地文件里。所以说它在这会记录一下,这样话你filebeat给重启之后,它呢会读取这个文件,然后呢,根据你上一次记录的off继续往下面消费。它可以保证,就算你那个filebeat要停了,那在它停的中间,你往那个日里面记录数据,它后期启动之后还可以把它呢采集过来是这样。注意了,如果说呢,我们想要让这个filebeat的重启之后啊,继续重新开始消费。这样怎么办呢?暴力一点的话,我们可以直接把这个。这个data目录啊,给它删掉,你删掉之后这些信息是不是就没了,没了之后它就认为它是一个新文件,就从头开始读了啊,这个需要注意一下。

image-20230425231605565

1
还有一点就是我们在使用filebeat的时候啊,如果发现filebeat没有正常工作,这个时候呢,我们需要去查看filebeat的日志文件,来排查具体是什么问题,因为有时候有一些错误信息啊,他不会暴露到这个工作台上面,它会记录到日志文件里面。像few b的下面有一个move。看到没有,它下面有这个这文件,注意你看的话就看这个,它后面这个相当于是一些备份的啊。你直接看这个就行。这是最新的一些日志。这些呢,是之前的一些老的日志。如果说他有一些错误信息在这里面就可以看。你在这儿可能看不到,它不会暴露到这个地方啊,你说呢,我把这个飞票启动了,也没见他报错呀,结果呢,他也没有把数据采集到我的卡夫卡里面,那所以说你就需要来这儿来看啊,看那个日文面。好,那针对服务端日志和这个客端日志的一个数据采集啊,我们先讲到这儿,在这呢,大家主要掌握数据采集的整理思路,重点是里面那个数据聚合,数据分发这两个步骤,那个数据落盘呢,倒没什么特殊的,对吧,咱们之前已经用过很多次。

采集服务端数据库数据

1
2
3
前面呢,我们把这个服务端日志以及客户端日志呢采集过来,下面呢,我们还需要将服务端数据库的数据也采集过来。咱们前面分析了啊,由于历史粉丝关注数据呢,只需要导出一次,所以说呢,没必要使用sqoop,那还有一份数据呢。是那个每日的主播等级数据对吧?所以这份数据呢,我们在最开始的时候会将数据库里面的全量数据导出来一份,后期只需要根据表中的更新字段获取发生了变化的数据即可,这样每天需要导出的数据量就很小了,属于增量采集。这时候呢,可以选择使用sqoop,其实呢也可以选择使用shell脚本,sqoop的使用在上一个项目中我们已经用过了,所以接着呢,我们来讲点不一样的,我来使用shell脚本,将mysql中的数据导出来,所以说么,这两份数据我全部呢使用mysql脚本。把它倒出来。那首先呢,我们使用这个mysql里面那个-e这个命令,将这个具体的查询命令啊准备好。

注意我其实在这呢,可以直接操作我Windows本地的那个MYSQL,这个里面它默上是有那个MYSQL客户端的,你如果没有的话,你就装一下啊,我们使用mysql -e后面的话就可以写具体的sql了。因为你现在也不是连本地的马,你是连其他机器的马,对吧。我们在这个list里面连接我们Windows里边那个马。那我们Windows的那个机器这个IP是也有2.168.182.1。这样就可以,这也可以编了哈。那这里面写了一个参考,我们先写那个历史粉丝关注数据啊。再来呢,它里面有两列,一个UID,一个呢是UID。from我们这个库的名称啊,是一个点。follow。零零,我们先查这一张表。看到没有,它是可以执行的。那以及呢,我们还要查询这个每日主播等级数据啊,就每天发生了变化的那些主播数据。注意它里面的话字段比较多。注意它里面有一个update time,就是更新字段啊。就这个,如果说这里面这个数据发生了变化。某一个字段被改了,那么这个就会变化,所以说我们每天呢,就根据它去抽取数据。up time。大于等于。所以我们这里面这个日期啊,目前的话看一下。那是2月1号对吧。26年2月1号,所以说我在里面这样来,我就把这些数据给它查出来,二六杠零二杠零幺。000000对吧。只要凌晨开始按着。update time。小于等于。2026杠零二杠零一。23:59:59,这样的话就可以把这些数据全都查出来。没问题吧,也是可以的啊。好,那这两条命令啊
1
历史粉丝关注数据

image-20230425232805742

image-20230425232851743

image-20230425235033202

1
每日主播等级数据

image-20230425235117301

image-20230425235132504

1
验证成功之后呢,我们就需要把这个命令啊公布到脚本里面,首先是这个用户历史关注数据,注意这份数据啊。它其实呢是分表存取。你看我在这其实只是创建了一部分表,它有很多张。

image-20230425235203139

1
这个分表逻辑是什么样子的,你看它里面存储的是什么呀,是一个用户的一个UID啊。所以说这样的,它会根据这个用户的UID来计算一个MD5值。你这个MD5值是什么样子的呢?给他搜一下。对吧,你看它的MD5值其实就是这样。就是类似于这么一串。

image-20230425235426902

1
2
3
我们呢,会根据这个用户的UID去计算一个MD5值,然后呢获取最后两位字符,然后呢拼接到这个follow后面。组装一个新的表名。这样话呢,他会算一下你那个UID最后两位是什么,然后呢,存到对应的表里面。这是它一个分表逻辑啊,那你后期查的话呢,也会根据这个UID计算MD5获取最后两位,然后呢来前面呢拼那个follow下划线,后面的话拼上了两位,这样就找到它对应的一个表。

注意这种组合呀。这两位你看它有可能是零到九以及a到z中的任意一个,所以说呢,这位它呢有36种情况,这一位呢也是36种情况,你26个英文字母加上零到九有十个,一共是36,每一位都三十六三十六乘以36是吧。1296张表。所以说呢,我们其实在实际过程中,我们需要将这1296张表中的数据全部都导出来,所以我在这呢,也没有建那么多张表,太多了啊,所以说在这建了一部分,在这大家知道它是一个分表的就行啊。然后呢,你要知道它这个分表规则。OK。这样的话,一会我们采集数据,我就先采集一张就行,但是呢,我会写脚本,让他可以支持把这个1296张表中的数据全部都采集出来。我们把那个脚本写好,但是具体采的时候,我们就只采这一张表就行啊,因为就它里面有数据。

extractFollower.sh

1
好,下面我们来开发第一个脚本。我现在来写。简单号。它保存一下啊,给它起个名字叫attracted。要抽取follow。是脚本。只需要执行一次即可。你只需要抽取自己的是历史数据,后期的话我们还有一份实时数据,就可以实时维护了。针对1296张表。需要使用这个双层或循环。动态生成表明。I。12345678。所以后面还有呢a。FGH。I z KL mn pqr。ST。UVWXYZ,好吧,这题又是个直接画啊度。这是一层,你里面还要套一层呢。因为我们要批那个表明的最后两个字符嘛,对吧。G。这个我就不是文桥了。50块。好。那现在里面了,我来艾打印一下。就把那个表明啊给他批出来。哎。当然,这行吧。其实你在这儿只要能把这1000多张表的那个表明全都拼出来,那继续把它导出去,那不就很简单了吗?对吧?我先在这呢,先写一个导师的一个脚本。马杠u杠P杠H。18291。藏意后面是circle。这个UIDUID。from。video。零零。注意这样的话就可以获取那些数据了,然后我把这个数据呢,给它重定向。到这个里面这个soft。video recommend。就使用这个表名作为文件里面的前缀log,这样的话其实就可以把这张表数据给它导出来了。你如果把这个东西放到这儿。好把重要啊,我们一会执行不执行,你把里面改一下是不是就行了,你这个东西。还有那个。这东西,然后那个是不是可以了呀。你只要说这个循环执行完,其实就可以把所有数据全都倒出来。我在这通过IO把这个表名导一下就行,行吗。最后的话呢,我们只是把这一个标点数据给它打开就行。好,那接下来呢。先试一下啊,重新在这儿来建一个脚本。video'。到了。OK,这样就可以了,那接下来我们来执行SH。看到没有,前面都打印出来了,没问题啊。那我们来确认一下,这个木下面有没有产生那个零零.log。产生了吧。看一下里面的数据。没问题吧,没问题啊好,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
# 此脚本只需要执行一次即可

# 针对1296张表,需要使用双层for循环动态生成表名
for i in 0 1 2 3 4 5 6 7 8 9 a b c d e f g h i j k l m n o p q r s t u v w x y z
do
for j in 0 1 2 3 4 5 6 7 8 9 a b c d e f g h i j k l m n o p q r s t u v w x y z
do
echo follower_${i}${j}
#mysql -uroot -padmin -h 192.168.182.1 -e "select fuid,uid from video.follower_${i}${j}" >> /data/soft/video_recommend/follower_${i}${j}.log
done
done

mysql -uroot -padmin -h 192.168.182.1 -e "select fuid,uid from video.follower_00" >> /data/soft/video_recommend/follower_00.log

image-20230426000249498

image-20230426000303163

image-20230426000343050

image-20230426000403681

extractUserLevel.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash

# 此脚本每天执行一次,添加到crontab或者Azkaban调度器中[每天0:30分开始执行]

# 正常情况下获取昨天的数据,如果需要补数据,可以直接指定日期
if [ "X$1" == "X" ]
then
yesterday=`date --date="1 days ago" +%Y-%m-%d`
else
yesterday=$1
fi

mysql -uroot -padmin -h 192.168.182.1 -e "select * from video.cl_level_user where update_time >='${yesterday} 00:00:00' and update_time <= '${yesterday} 23:59:59'" >> /data/soft/video_recommend/cl_level_user_${yesterday}.log

# 将数据上传到hdfs上,每天一个目录

# 先在hdfs上创建日期目录
hdfs dfs -mkdir -p /data/cl_level_user/${yesterday//-/}

# 上传
hdfs dfs -put /data/soft/video_recommend/cl_level_user_${yesterday}.log /data/cl_level_user/${yesterday//-/}
1
2
3
4
5
那接下来我们来开发第二个脚本。这个也是抽取数据了。他呢是抽取了一个user level,有这个主播等级的啊。注意在这个脚本中啊,我们需要将数据上传到hdfs上面,并且呢,这个脚本啊,也需要添加到Crontab或者azakaban那个调度器中,每天凌晨00:30了,然后执行一次。就是每天呢抽取一次啊。

那下面我们写一下,就是正常情况下。每天凌晨获取昨天的数据。你如果呢,你需要补数据的话。可以直接指定日期。所以这个的话,我来获取一个日期。呃,一。等等a。那说明道一为空。then。还有这个yesterday。等于。加个反引号。刚刚date。days ago,一天以前就是昨天嘛,对吧,加号百分号Y杠百分号M,杠百分号D。这就过去昨天日期了。else,如果说呢不相等,那说明了多少?有值,有值的话呢?二。这样就快。那下面的话其实就可以把我们那个circle语句给它拿回来,那个搜有点长,我在这呢。到这儿拿一下。我们在前面是不是写过呀。对,这里面写这个星号就可以啊,没问题。这边需要改一下。把那个日期拼上。注意你说我在这成这个单引号,它不是不解析吗?大家注意外面还有一层双引号,咱们之前讲过对吧,双引号里面这个单引号这个会解析这个变量啊。我反过来又不行。那最后呢,我们把这个日志数据呢,给它重进现到这个目下面,在这我可以把前面这个复制一下是吧过来。后面的话呢,其实就是这个。表明了。使它来拼接一个热面。注意其在后面呢,你最好拼一个日期对吧,因为他每天都有一次。填了一个日期。点到这就可以。注意,那你到这还没完,我们还需要把这个日上传到HDS上面。杨淑玉上传。的X元。每天一个目录。所以你在这呢,你先在hps上创建日期目录,因为每天一个目录嘛,对吧。CS杠那个地可以加个杠P对吧。it。后面呢,我们就使用这个level user。然后后面把这个复制过来。大家注意这个日期格式呢,中间是带这个横杠的年月日,我们其实想获取这个不带横杠的。但是由于这他确实需要带横杠的,那我们这又不想要怎么办呢?那你就改一下呗,是吧。把横杠给替换掉就行,对吧,全部横杠替换成空,这样的话就是这个年月日中间不带任何分隔符了。这个用法咱们前面也讲过吧。还没上班。高foot。那就是这。把它呢,上传到这个目录下。这一块。

好,注意,我这个脚本其实就是一个增量脚本,咱们前面说了,你在最开始的时候,其实啊,你还需要将这个表里面的之前的全量数据也给它导一次。那个脚本我这里先不写。我直接写了一个每日的增量脚本啊,大家要知道这个事情。因为我们现在这里面这个数据其实都是认为是一些增量数据啊,我直接。使用这个日期过滤,就可以把里面所有东西全都过滤出来。

image-20230426003329063

1
2
3
下面我们来确认一下。杠,我们来查一下这个date。小level user。有吧?再来查一下这个。没问题吧,有数据啊,那我们来看一下里面内容呗。对吧,这就是我们采集过来的内容。

但是你在这需要注意点,它是有一个表头的。所以说呢,我们后期啊,在处理这个数据的时候,需要把这些数据给它归掉就行。那到此为止,服务端数据库中的数据我们就采集完毕了,那在这里面啊,大家需要熟悉我们的数据来源,以及数据最终的存储位置。后面我们的计算程序呢,都需要依赖于这些数据。

image-20230426003427969

image-20230426003455794

数据计算核心指标详细分析

image-20230426104322881

第一个任务

1
2
3
大家好,前面我们把项目需要的数据都采集过来了,下面我们开始基于这些数据开发数据计算模块。由于我们需要计算的指标比较多,所以呢,我们先对需要计算的这些核心指标呢进行详细分析。来看一下这张图。我们从左往右分析,首先看左边第一列,这里面表示的是计算程序的数据源,其实就是咱们前面分析的那五种数据。服务端日,客端日加上服务端数据库数据。

那既往右边看。注意,右边这里面列出来的都是具体的计算任务。那我们首先呢,来看一下第一个任务。它表示的是历史粉丝关注数据的初始化,这个任务负责把服务端数据库中的历史粉丝关注数据批量初始化到neo4j。这个任务呀,只需要运行一次即可,后面就不需要了,它属于一个临时的任务,因为后续的粉丝关注数据就通过第二个任务来实施维护了。

第二个任务

1
那下面呢,我们来看一下第二个任务。这个任务呢,是实时维护follow和UN follow关系,follow呢就是表示关注,UN follow表示取消关注。这个任务是需要一直运行的,他会实时维护粉丝和主播的关注情况。它的数据来源是kafka中的实时粉丝关注数据,这个是通过我们的日志采集程序采集过来的。那通过前面这两个任务呀,就可以将我们全平台的粉丝关注数据进行全量维护了。第一个任务负责历史粉丝关注数据的迁移,第二个任务负责实时维护,这样在neo4j中就维护了全量的粉丝关注数据。

第三个任务

1
那接下来我们来看一下第三个任务。他需要每天定时更新用户最新活跃时间。这个任务的主要作用呢,是为了维护所有用户的活跃时间。因为最终我们在统计三度关系的时候,如果是针对全平台所有的用户都计算,这样是没有意义的。为什么呢?我们来画一个图,再来分析一下这个三个关系的一个流程。

image-20230520175733543

1
2
3
4
5
这个呢,是一个主播,主播一吧。好,这个主播呢,他有一些粉丝。这些粉丝是不是还可能会关注了其他主播呀?所以说在这呢,我们再画几个主播。你看这时候如果说有一个用户。当这个用户呢,你看他去关注这个主播一的时候。其实呢,我们需要把它对应的一个三度关系推荐给这个用户,对吧,其实就是这块这些主播对吧。它们两个之间的一个关系,它到它是两个关系,再到它是三种关系好。注意这里面这个主播以及粉丝呢,其实都属于用户,只是说呢,他的角色不一样吧。对于我们平台而言,所有的主播以及粉丝都属于用户。

那我们在计算三的关系的时候,针对这里面这个主播和粉丝最好呀,都是最近活跃过的。如果说这些粉丝很长时间都不回来了。那么我们认为他的这些关注数据啊,这个参考价值就没那么大了。如果这些主播呀,也是很长时间都不活跃了,那么更不应该进行计算。因为最终假设把它计算出来的话,就会把它推荐给用户。那用户关注他了之后呢,就想去看一下他最近的开播视频。结果发现这个哥们呢,很久都没开播了,那你这种推荐就没有任何意义了,其实就浪费了一次机会对吧。

所以后期啊,我们在计算三度关系的时候,会对这里面所有的用户,不管你是粉丝还是主播,都会对他的一个活跃时间进行过滤。当时我们在实际工作中使用的那个时间,是根据最近一周活跃过的用户来计算三种关系。就说你这里面这些主播以及粉丝都要是最近一周活跃过来,如果没有活跃过,我在计算三种关系的时候,就不把你们计算在内,直接把你刨除掉。OK,那这个就是第三个任务。这个任务呢,它是每天执行,因此每天凌晨根据昨天的主活数据来维护每个用户的活跃时间。当我们后期计算三个关系的时候,会根据这个条件过滤出来满足条件的用户。

第四个任务

1
好,接下来我们来看一下第四个任务。这个任务啊,主要是在neo4j中维护主播的最新等级信息,因为我们在计算三度关系的时候,还需要对最终计算出来的主播等级进行过滤。如果主播等级太低,那就没必要推荐了。这个任务呢,也是一个离线任务,每天执行一次即可,它的数据呢来源于hdfs,这个数据是我们通过脚本每天定时从服务端数据库中找出来的。

第五个任务

1
那接下来我们来看一下第五个任务,这个任务呢是每周一计算一次,每周一会到HDPS上面获取最近一个月的视频数据。计算最近一个月内视频评级满足3B加或者是2a加的主播。注意视频等级呢,主要有sabcd这五个等级,S是评级最高的,D是最差的。这里面的3B+表示最近一个月至少要有三次开播,并且最近三次开播视频的评级需要是B级以上,包含B级。那这里面的2a+表示最近一个月至少要有两次开播,并且最近两次开播视频的评级需要是a级以上,包含a级。这两个指标,一个是考虑主播的开播频次比较高,但是呢,视频的评级不是特别高,这样也是可以推荐的,相当于它是一种勤劳的主播。还有一种呢,是主播开播频次比较低,但是呢视频评级特别高,比较优秀,这种呢也可以推荐啊。所以说你只需要满足3B加或者2a加都是可以的。
1
2
3
4
5
6
7
8
那最终啊,我们把满足条件的主播计算出来,然后呢到neo4j中进行维护。给主播增加一个flag属性,如果主播最近的开播数据满足3B加或者是2A加,则把这个flag属性置为一。后面在计算三度关系的时候呢,会对主播的这个属性进行过滤,不满足条件呢就不推荐了。那到这为止,前面这五个任务啊,都是对这个neo4j它里面这个数据进行维护的。其中第一个任务呢是初始化数据的是执行一次。

因此。第二个任务呢,是一个实时任务,需要一直执行,其实这个任务呢,也可以改为离线的,一天执行一次,我们当时主要考虑的是这样的neo4j中的那个粉丝关注数据啊,我们会有其他业务部门也在使用。所以说呢,我们就让这个任务实施维护了。那第三个和第四个任务呢,是每天只一次。第五个任务,本来啊,也考虑每天执行一次,但是呢,每天都计算最近一个月的数据,这样太频繁了,浪费计算资源,所以呢,我们就改为了每周一计算一次。因为我们最终的三个关系数据也是一周计算一次。所以这里面啊,这些离线任务,它的一个计算周期,其实最迟都是一周。所以说第三个和第四个任务其实也可以每周一计算一次。不过当时呢,是因为这些指标,其他业务部门在使用用户率的时候呢,也会用到,所以说呢,我们就每天执行一次。

那我在这呢,说这么多呀,主要是为了让大家明白,为什么有的任务需要实时,为什么有的任务需要离线,以及离线的任务为什么还要分为一天进行一次和一周之间一次。这些都是由一些具体的业务环境而导致的。

那到现在为止,你neo4j中的数据是这样。假设我们这里面创建的是一个user这个节点,对吧,这个节点的话,它俩具备以下这些属性。UID。以及呢,它里面有一个时间桌,代表它的一个活跃时间。就是最近的一个活跃时间。以及呢,有一个等级叫level对吧,主播等级。还有一个flag。表示啊,你这个主播最近这一个月,你这个视频平移是否满足3B加或者2a加,如果满足它的值就是一,不满足就是零了。
好,所以说呢,经过前面这五个任务在neo4j里面,我维护的数据其实就是这样。

image-20230426110524589

第六个任务

1
2
3
那接下来我们来看一下第六个任务。这个任务呢,是计算满足条件的主播的三度关系推荐列表。在计算的时候会根据前面几个任务的指标作为过滤条件。首先第一点在这里。他俩需要过滤出来最近一周里活跃过的。第二点呢,需要过滤出来主播等级大于四级的。第三点再过滤出来,最近一个月开播视频,满足这个3B加或者2a加这个条件。第四点,再过滤出来推荐列表中粉丝列表关注重合度大于二的。那根据这四点过滤之后呀,最终计算的三个关系列表数据才是我们需要的。

第六个任务,计算好的数据会先存储到hdfs上面,这个任务是每周一计算一次。因为这个三种关系出现,数据的更新力度没有必要细化到每天。如果每天都计算一次,最终的结果数据变化也不大,并且每天都计算一次,对计算资源的要求也比较高,所以当时我们定的是每周计算一次。更新一次最新的三度关系列表数据。

第七个任务

1
数据计算出来以后,会通过第七个任务把这个数据导出到MYSQ里面。这样就可以通过MYSQL对外提供数据了。这就是数据计算模块中的详细计算流程。下面呢,我把这个数据计算步骤啊做了一个汇总,我们来看一下。

image-20230426110912322

image-20230426110948070

image-20230426111106202

1
第一步,历史粉丝关注,数据初始化。第二步,实时维护粉丝关注数据。第三步,每天定时更新主播等级。第四步,每天定时更新用户活跃时间。第五步,每周一计算最近一个月主播视频评级信息。第六步,每周一计算最近一周内主活主播的三组关系列表。第七步,三种关系列表数据导出到马,这个就是具体的我们的计算步骤。

image-20230426111116924

数据计算之历史粉丝关注数据初始化(第一个任务)

1
下面呢,我们首先来看一下数据计算中的第一步。历史粉丝关注出于初始化。咱们前面分析过啊,历史粉丝关的数据呢,来源于服务端数据库,并且呢,这些数据在存储的时候还是分表存储,一共1296张。这个表里的数据格式呢是一样的。fuid、uid,还有一个time stamp。这个fuid呢,表示关注者,其实呢就是粉丝。这个UID表示被关注者的UID,其实就是主播。这份数据呢,在前面开发数据采集模块的时候,我们已经生成过。那接下来我们就要把这个数据啊,初始化到neo4j中。那我们需要把之前生成的这个文件呢,上传到neo4j的一个import下面才可以使用。

image-20230426111700471

1
我们之前导出的这个粉丝关注数据在这就这个文件,那我们需要把这个文件给它复制到import目下面。嗯了,好,这样就可以了。那下面呢,我们在初始化这个数据之前啊,我们把那个neo4j重新初始化一下,就把里面数据全给它删掉。因为之前里面有一些特殊数据。把那个data目录删掉。

image-20230426111836337

image-20230426111852239

1
停一下。嗯。在其中。好,这样的话就可以了,注意因为你在这把那个data目录删了之后啊,你再重新启动之后呢,我们需要去修改一下密码。

image-20230426111933953

1
那接下来我们需要到这儿来初始化数据了。我们在这先连到这个bin/cypher-shell这里面。bin/cypher-shell -a bolt://bigdata04:7687 -u neo4j -p admin。

image-20230426112214786

1
2
3
4
5
6
7
8
9
10
11
12
好,那这里面呢,我们首先要针对这个关键字段建立索引,对吧。咱们前面是不是已经做过这种操作呀。所以下面呢,我们还需要执行一个这个。好,执行成功,那我们到界面上来确认一下数据。好,是可以的。
neo4j> CREATE CONSTRAINT ON (user:User) ASSERT user.uid IS UNIQUE;
0 rows available after 281 ms, consumed after another 0 ms
Added 1 constraints
然后批量导入数据
neo4j> USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///follower_00.log' AS line FIELDTERMINATOR '\t'
MERGE (viewer:User { uid: toString(line.fuid)})
MERGE (anchor:User { uid: toString(line.uid)})
MERGE (viewer)-[:follow]->(anchor);
0 rows available after 791 ms, consumed after another 0 ms
Added 11 nodes, Created 17 relationships, Set 11 properties, Added 11 labels

image-20230426112713190

image-20230426113352894

1
注意,我们这儿呢,只初始化一个数据文件。那你说真的,我们实际工作中啊,我们有1000多个这种数据文件,你需要让我在这执行这个命令,执行1000多次吗?这肯定是不合理的。那如何一次性批量导入呢?可以这样来做啊。我们可以把这多个文件合并成一个大文件,对吧,你在Linux命行里面,你看这个文件,然后呢,通过这个右箭头>>重定向。把这个1000多个文件里面那个数据啊,整合到一个大文件里,这就可以了。好,那这样的话呢,针对数据计算的第一步初始化数据,这个我们就搞定了。

本文标题:大数据开发工程师-第十八周 直播平台三度关系推荐v1.0-2

文章作者:TTYONG

发布时间:2023年04月24日 - 15:04

最后更新:2023年05月20日 - 18:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AB%E5%91%A8-%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0%E4%B8%89%E5%BA%A6%E5%85%B3%E7%B3%BB%E6%8E%A8%E8%8D%90v1-0-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%