第十八周 直播平台三度关系推荐v1.0-2
数据采集架构详细设计

1 | 大家好,下面呢,我们就从我们整体架构里面的第一个模块数据采集模块开始。注意,在实际过程中,数据采集模块不是只针对某一个项目而言的,而是一个公共的采集平台,所有项目依赖的数据全部都来源于数据采集模块,所以在设计采集模块的时候要考虑通用性。不能仅仅是为了这一个项目而服务。 |
数据来源分析

服务端日志数据
1 | 下面我们来分析一下,针对这个项目,我们需要采集哪些业务类型的数据,以及这些数据来源于什么地方。首先是服务端日志数据。什么是服务端日志呢?可以这样理解。就是我们在APP中点击一些按钮的时候,例如我们要关注一个主播,这个时候当我们点击这个关注按钮之后。APP呢,它会去请求对应的接口。接口中的代码逻辑就是将我们关注的数据保存到数据库里面。同时,这个接口也会记录一份日志。因为这个接口呢,是为APP提供后台服务的,所以它记录的日志我们称之为服务端日志。接着我们来简单画一个图来看一下。 |

1 | 这个呢,是隔壁老王。那是我们的一个用户。画的形象一点。这是我们的一个APP。我们这个APP里面呢,它有一个关注功能。这时候呢,你看老王啊,他会点击这个关注功能。要关注某一个主播。当他点击这个关注功能之后,这个APP里面这个关注功能,它对应的它会调用一个接口。那我们这块呢,有一个后台服务器。这个服务器里面呢,部署的有一个接口服务,就是这种HTTP接口。那对应的这个关注功能,其实呢,还会调那个接口。所以你在这呢,点击APP里面这个关注按钮。它底层啊,其实会调这个接口。这个接口呢,其实呢,它会操作一个数据库。相遇老王在这儿呢,关注了一个主播,最终啊调这个接口。接口呢去操作这个数据库,最终把老王关注了,谁把这个数据呢存到数据库里面。就是这个逻辑啊。那这个时候注意我们在这个接入服务里面呢,因为它俩现在也是一个外部项目,所以什么它里面可以记录日志,那这里面记录的日志呢。我们就把它称为是服务端日志。 |
服务端数据库数据

1 | 那接下来我们来看一下服务端数据库中的数据。注意服务端数据库中的数据啊,其实啊,就是我们刚才图里面这个数据库里面的数据。来看一下。就这块儿。就相当于啊,我们App某一些功能呢,它会调用这个后台的接口,然后调用这个接口之后呢,最终啊,其实操作的是这个数据库。我们所说的这个服务端数据库的数据,其实说的就是这块它里面的数据。那在这个项目里面,我们主要获取历史粉丝关注数据,以及呢主播等级数据。这里面我们需要历史粉丝关注数据,因为我们在做这个项目的时候,我们的直播平台已经运营了两三年了。所以说,我们需要把历史粉丝关注数据初始化到图数据库中。这些历史数据服务端存储在数据库中,所以说我们需要从数据库里面去取。还有就是这个主播的等级数据。其实这个数据呢,在服务端日志中也有,但是我们考虑到服务端数据库中的数据是最准确的。特别是针对用户相关的数据,最好是以服务端数据库中的为准。所以说呢,我们就从那个服务端数据库中,每天凌晨啊,定时把昨天等级发生了变化的这个主播等级数据呢,导入到hdfs,方便我们后面的离线计算时。 |
客户端日志数据

1 | 最后呢,我们来看一下客户端日志数据。刚才我们分析了服务端日志。 |

模拟产生数据


1 | 前面我们分析了具体需要什么数据,以及数据从哪里来,下面我们来看一下如何通过代码模拟产生这些数据。先看这个图。我们通过执行generate date这个项目中的这五个入口类,可以模拟产生这五种数据。就是咱们前面分析的那五种数据。那在这里注意一下,因为我们不能直接使用企业中的真实数据啊,所以在这里我会根据企业中真实数据的格式去模拟生成。最终的效果是没有区别的。 |
1 | 第一步: |
1 | 第二步: |
1 | 第三步: |

1 | 接下来执行代码,开始模拟产生数据 |
1 | 2:【服务端日志】视频数据:GenerateVideoInfoData |
1 | 3:【服务端数据库】历史粉丝关注数据:GenerateHistoryFollowData |
1 | 4:【服务端数据库】主播等级数据:GenerateUserLevelData |
1 | 5:【客户端日志】用户活跃数据:GenerateUserActiveData |
1 | 至此,项目中需要的数据都可以正常产生了。 |
1 | 那接下来第一步我们先对这个data collect这个项目给他呢编译打包。嗯。CDDB。video。recommend。C的这个data collect。肯定package。港地skip test。好,编译成功。那把它上传到我们这个服务器上面,注意在这呢,我统一都上传到我们这个BD的零四这台机器上面。那接着呢,我统一再建一个目录。每个点啊。V6。recommend。所以说呢,针对这个项目相关的一些架包啊,那些东西全部都放到这个目录里面,这样方便管理维护啊。那我们进到这个目录下面,注意现在里面呢,我在创建一个目录。叫data collect。那这里面呢,就放我们这个data这个接口,这个夹包。打开这个图形化界面。把我们刚才生成那个架包给他传上去。it soft。在这。把这个家包传上来。好,那下面呢,我们把它启动起来。Java杠架。后面呢,直接指定那个另一个加包就行了,注意我们需要把它放到后台运行,所以说呢,前面加个no ho。后面呢,加个and符,这样就可以了。GPS一下。看到没有,这个架已经启动了,你可以这样GPS杠ML。这样看起来更加清晰,对吧,就这个data collect。那下面呢,我们来测试一下这个服务啊是否正常,因为它是一个接口服务。这里面呢,我们其实开放的有一个测试的一个接口。看到没有?你直接调用这个就行,它是一个测试结果,当然你在这需要传一个参数啊,它需要提出一个参数name。他会把它打印出来,这样可以确认一下你这个服务是否正常啊。这个怎么验证呢?它是一个HTTP请求,你可以选择在我们的浏览器里面去操作,或者说呢,我们直接在这个控制台里面也是可以的,通过curl这个命令杠X。盖着操作啊。HTTP冒号双斜线logo。冒号,注意它这个端口呢,是8080。我在这指定了。看到没有8080啊。VT对吧。name。等于test吧。你看状态是200是OK的啊,说明我那个服务呢是正常启动了。那接下来第二步,我们把这个server这个项目呢,给它编译打包。有这个项目。他是负责写收我们那个服务日志的。嗯。clean港。开始。好,编译成功。把它传上来。注意,那我们在这需要创建一个目录啊。你可以在这直接对吧,右键这样创建也是可以的啊都可以。serve。因此。嗯。好,上传成功。那这个呢,我们要把它启动一下。no Hu Java刚加。安福。嗯。先确认这个服务是不是在啊在吗。好,这个呢,我们也来验证一下。大家注意它这个接口名称呢,就不是这个,你可以到这儿来确认一下。这是S1T1对吧,以及呢,它的端口注意。因为我们是在同一台服务器上启动的,所以说呢,这两个项目啊,它监听的端口肯定是不能重复的啊,这个我给它改成80812。四上这个是8081。后面的是S1T1对吧。嗯。好,OK啊,还是这个size。好,那这样的话,这两个接口服务呢,就搞定了,这个以这个都部署好了。那接着第三步,我们需要把这个MY面这个数据库啊,还有表啊,给它触发好,这个呢,给大家提供了有这个触发脚本也比较方便啊。现在这右键。进行一个so文件。直接把这个脚本。指另外就行了。就是in my circle tables啊。这个脚本。好,执行成功。在这刷新一下。好,它呢,会产生这么些表啊。OK,这个后面我们具体用的时候再来具体查看。那接下来呢,我们就需要去执行这些代码了。这样的话呢,就可以模拟产生数据了。首先呢,我们先产生这个实时粉丝关注数据。就这个服务端呢。啊,服务端日志。模拟生成实时粉丝关注和取消关注数据。注意了,大家在下面呀,在执行我这个代码的时候,需要注意,你们需要改点东西。注意在上面,你看这个是调用我线上这个接口来获取这个模拟数据,获取到之后呢,注意。下面呢,会调用我们本地刚才部署的那个接口服务。因为这个呢是服务端日志,所以说它会调那个服务端接口。我呢是在BD的零四这台机箱部署的。对吧,这个服装接口,它这个对应的端口号是8081。对吧。后面这些东西呢,都不用改,你们下去改的话,主要改一下这个对吧,你在哪一台机器上去部署的这个什么server这个接入服务,那你在这就把那个IP或者主机名写到这就可以了。所以接着的话,就可以模拟产生这个服务端的这个调用请求,最终呢,去调用这个服务端这个接口,让这个接口呢记录日志。那你说这个接口,它具体把日志记录到哪个目录下面呢?注意,在这也能看懂啊。在resources这个目录下面有一个log back点查明。看到没有,我把这个日呢记到这个Di这个母下面了。然后他这块呢,具体的一个日志文件名称是server下换in加log。那对应的这个对collect看了也有。它日志呢,也是放到这个电log这个目录下面。然后它的日文名称呢,叫data collect。所以说这个到时候可以通过这个文件名就知道到底是哪个接口服务记录的日志啊,也好分析。那我们这个代码呢,都是OK的,以及这些接口这些东西呢,我们也不需要改,对吧,也都是OK的,下面呢,我们来执行一下。注意这个代码呢,它是实时产生数据,现在呢,你在这执行一次,它会产生一条数据,就是模拟那个实时产生啊。看到没有接口调用成功,注意这块是我在这记录的日志啊。借口立项成功,它最终产生的日是这个。好,那我们来看一下,我们具体去调这个服务端接口,这个服务端接口有没有把这个日给记录下来了,我们来确认一下。在这重新克隆一个绘画。对,log。到这个木下面。看到没有?这个里面其实已经有值了,对吧,我们可以在这摸看一下啊。没问题吧,他已经记录下来了,说明我们这个流程目前是通的啊。这个呢,就是服务端的实时粉丝关注数据。那接下来往下面看。这个generate video for。这个也是服务端日志对吧,是模拟生成视频相关的数据。就是你这个主播开播结束之后呢,其实它就会调用服务端接口。把当前这个直播的相关的一些信息上报过去啊。那注意你这个在用的时候也是一样的流程,对吧,这个上面还是就我那个接口。那下面的话呢,你需要在这对应改一下对吧。好,那我在这来执行一下。注意这个呢,一次性我可以采用多条数据啊。你看接口交换成功是OK的。那下面注意我们来确认一下数据。因为这时候它里面产生数据比较多,我就使用T杠一吧,查看里面最新的一条数据。对吧。也怪了。OK,所以说这个呢,是服务端的这两种类型的数据都采集过来了。往下面看。那接着呢,我们来看一下这个服务端数据库里面的数据。首先呢,是这个。就是这个历史,粉丝关注数据。注意。这个电板执行之后呢,它会把这个数据啊,初始化到我们那个数据库里面啊。注意,你在执行这个代码之前,你需要先执行我这个脚本。进行这个数据库和表的一个初始化。初始化之后的话就可以直接执行了,这里面别的不需要改动了,注意你要改点东西。看到没有,要改一下数据库啊。你的一个数据库在哪,把那个IP地址啊,以数据库啊,一个名称啊,对吧,用户名啊,密码啊对吧,这些东西都改一下就可以了。来,我们来执行一下。从那个里面你可以看出来,他最终把数据啊,写到这个FOLLOW00这个表里面。好,我们到这来看一下。好,数据呢,都写进来了。这些测数据啊。这个呢,其实就是历史的一个粉丝关注出去了。那接下来注意我们来看这个主播等级的。也是这个服务端数据库里面数据模拟生成主播等级数据啊。它也是操作数据库。那这里面的话,他会把数据写到这个level user这个表里面来执行一下。OK,我们来确认一下。好,数据都写进了。那这样的话,相当于服务端数据库里面这两份数据呢,我们也都给他触发好了。那其实还剩一份数据,就是这个客端数据,就是那个用户活跃的。gena use active。对吧,客端是模拟生成用户活跃数据。这个呢,还是通过调用我这个接口获取数据。那下面注意这时候呢,它会调用我们那个客户端的那个日志收集服务器,对吧,其实就是一个client。他们最终会调用他,把数据上报给他。所以你的用处呢,也要改成这个,看一下你这个data client,你部署到哪个机器,把这个改一下就可以了。来执行一下。好,这个data可爱呢,他接收到用户上报的这些数据之后呢,他呢也会把它记录日志记录到本地。嗯。嗯。在这他又起到这个did collect点里面。我们取头一条还杠一。看到没有,就这个这个类型是user active,看到没有,都是接格式的啊。那到此为止呢,项目中需要的数据呢,我们都可以正常产生了。 |
客服端日志接收服务


服务端http接口



实时粉丝关注

视频数据

历史粉丝关注


历史主播等级

客户端用户活跃日志

数据采集聚合

1 | 咱们前面把数据产生好了,下面就可以进行数据采集了,首先呢是使用这个filebeat呢,将所有日志数据采集到kafka的一个topic中中,把这个日志数据采集到kafka里面。日志数据有三份,服务端有两份,客户端日志有一份。注意,在这具体执行之前,我们需要先把这个zookeeper以及kafka这两个服务给它启动起来。 |

1 | 那接下来我们呢,就可以使用filebeat的去采集数据了,注意现在filebeat呢,我们还没有安装呢。需要安装到哪些机器上面呢?注意你要看你之前那个客户端的那个接口,以及服务端的接口,你都部署到哪些服务器上面,我在这呢,都给它部署到这个bigdata04上面了。因为filebeat呢,要采集data_collect这个接口记录的日志,以及server_inter这个接口记录的日志,所以说呢,你后期这两个接口你部署到哪台机器上面,那么你在对应的机器上就需要部署这个filebeat去采集数据。 |



1 | [root@bigdata04 filebeat-7.4.2-linux-x86_64]# ./filebeat -c filebeat.yml |

1 | 输出了一堆东西。虽然是一堆啊,还是有格式的,是一个阶层格式的,注意这个呢,就是因为我们在这加了这个属性,等于true,它呢会对你返回的数据呢,你做一个格式化。反馈的数据,你看就这个message里面内容,其他内容呢,都是它里面默认加的啊。接着你想停掉的话,直接CTRLC对吧。 |

1 | 接下来我们需要配置输出。这是一个输出组件,注意在这我们需要指定那个kafka的一个输出组件,那在这都需要指定什么配置呢?这个信息呢,到他那个官网上面是可以找到的啊,所以在边我就直接拿过来,你看output.kafka,把数据呢,输入到卡夫卡里面,直行卡卡的一个博物块地址对吧。123。还有这个topic,我们要把这个数据呢写到这里面。下面这几个配置你不用改就行了,这个呢表示啊,它往这个topic里面这些分区写数据的一个规则。这个表示卡里面那个艾机制是吧。这个呢是就是往里面写数据的时候,对这个数据15度压缩,这块呢,使用那个机会压缩可以提高性能。这个呢,表示呢,每一条数据最大的一个字节数量。好这样就可以了,你们下用的时候呢,有可能要改这以及这对吧,如果说你都和我的一样的话,其实都不用改。OK,这样的话就可以了,下面呢,把这个配置文件重新再传一下。 |

数据分发
1 | 下面呢,我们来看一下数据分发层,就是使用实现对采集到kafka指定topic里面的数据啊进行分发。咱们前面呢,把这个日数据采集过来啊,全部都放到kafka一个大的topic里面了,所以说下面呢,我们需要使用flume对这个大的topic里面数据啊进行分发,分发到一些小的topic里面,这样可以方便使用。注意,那这样的话,我们就需要在这个flume里面增加一配置文件,就是从那个大的topic里面消费数据,通过拦截器获取数据中的一个type自段的值作为输出kafka的一个topic的名称。这个配置文件呢,在这儿我已经写好了,我们来直接看一下 |
1 | #source+channle+sink的名字 |
1 | 那接下来就把它呢传上去就可以了,注意如果说我们需要在一台机器中启动多个flume进程的时候,最好在里面复制多个conf目录,因为如果在一个conf目录中启动多个agent的进程的话,多个agent的进程的日志信息会混到一块,后期排查问题会很麻烦啊,这个我们之前呢讲过了对吧啊,所以说呢,在这。进到这个。都门面。我们来复制一个。打靠谱。告不告?卡不卡?高不高相以我们从卡夫卡里面读出去,再把数据写到卡夫卡里面。进到里面。改一下他这个log的这个配置文件。把那个改一下,搞搞不搞搞卡不卡。好,这样的话就可以了。嗯。OK,那接下来我们在这里面来创建这个配置文件,就是刚才我们分析的这个。敢不敢杠后杠?敢不敢点。com?嗯。我们就组织一下。好,这样就可以了,注意这块搞定之后呢,我们就需要往下面走了,在这啊,我们也先不启动啊,那我们把这个数据落盘这块也搞定之后呢,从后往前启动。 |


数据落盘

1 | 接下来我们来看一下这个数据落盘层。我们需要使用采集指定topic各种数据进行落盘,便于离线计算。你看咱们前面呢,把这个数据呢,全部都采集到kafka这个大的topic里面,接下来做了一个分发,分发之后后面我们就要做这个落盘了。把我们需要落盘的数据呢,给它呢,落到这个HDFS上面,注意如果这里面这个大topic里面的所有类型的数据呢,我们都需要落盘的话,我们就可以直接读取这个大topic里面的所有数据,然后呢,使用这个拦截器。根据数据类型把它们呢保存到hdfs的多个目录中,这样呢比较方便,不过在我们这个项目中,这个大的topic里面一共有三种类型的数据,其中呢,有两种数据我们是需要进行落盘后期进行离线计算的。有一种数据呢是不需要的,只有实时计算上讲会用的,所以说我们可以使用两个flume agent来对我们需要的数据执行落盘操作。当然了,我们也可以啊,只使用一个flume agent的,那你这里面啊,也可以使用拦截器,只获取我们需要落盘的那两种数据,这样也是可以的,这个呢,给大家留一个作业,大家下去自己研究一下。使用flume拦截器如何呢?获取我们需要的两种数据,然后呢,分别把它呢落盘到hdfs的不同目录下面。 |

1 | 那我们进去去改一下那个logo的配置。那接下来我们还需要在这两个目录里面创建对应的一个配置文件,这两个配置文件啊,我也提前写好了,我们来看一下。 |
kafka-hdfs-user-active.conf
1 | #source+channel+sink的名字 |
kafka-hdfs-video-info.conf
1 | #source+channel+sink的名字 |
启动
1 | 然后将hadoop集群启动起来,因为数据落盘会使用到HDFS |
数据落盘
1 | 好 到这为止,采集需要的配置都修改好了 |


数据分发
1 | 2:再启动数据分发的flume |

数据采集聚合
1 | 3:最后启动filebeat采集进程 |

执行数据生成程序
1 | 4:接下来启动生成数据的程序 |


1 | 所以执行成功了,那我们来验证一下结果。我们到那个卡夫卡里面来消费一下。基于console的。注意这个内容还是比较多的,但是我们可以大致看一下啊,你看。这是一条数据,是不是很乱呀?我们的日志数据没有这么乱吧,并且我们的日志数据里面也没有这个东西。对吧,我们的日志数据里面其实是这些东西在这,你看它其实啊,其实在这面又封装了一层,他把我这个具体的业务数据啊,给封装到这个message字段里面。注意这些字段的话,相对来讲是filebeat默认生成的,那这些字段呢,它不是我们需要的,我们希望啊,只记录我们的原始日志即可。那怎么解决这个问题呢?可以解决啊,我们需要在那个filebeat那个配置文件里面加一个配置。需要在filebeat中的output.kafka里面增加codec.format配置。在这相当于啊,你要对它输出这个数据啊,做一个格式化。 |

1 | CD format。做一个格式化。四六类形的。对。我们呢,只取它里面的这个东西,哎,问号首先呢,括括号中括号我们只取里面那个message。所以这个配置的意思呢,就是说我们从这个数据里面。你看它这个采集过来数据里面有很多字段,我们只需要message字段里面内容,其实这里面就是我们的原始的日志数据。然后把这个filebeat再停一下。你再来启动,我们再来执行这个生成数据的这个。看到没有?这个就正常了,这样就可以 |

1 | 那接下来我们再来查看一下这个user_follow这个topic,这个看一看它里面能不能消费到数据,如果能消费到数据,就说明那个flume的数据分发过程是没有问题的啊 |

1 | 收到了吧。注意你在这能收到就说明啊,我们中间那个flume的分发程序是OK的,他从那个大的topic里面把这个数据读出来,然后呢写到这里,这样就可以好。那接下来我们再来执行两个程序。一个是这个active。其实呢,我们之前已经执行过了,在这我们再重新执行一下啊。然后重新呢,再生成一批数据,就是往那个日志文件里面再写一批啊,因为你调用接口,这个接口就会记录了,以及这个video info。好,这个执行成功之后呀,注意我们就可以到那个HDPS里面去确认一下结果数据了。因为你这个执行之后啊,他呢,这个接口就会把那个日志记录到本地。这个接口呢,就会把那个日志数据啊,记录到这个本地的日文件里面,然后filebeat呢,发现你里面啊有新增数据,所以说他就把这个数据给读出来,读出来之后呢,把它呢,采集到那个大的topic,就是all type data那个topic里面。然后呢,这里面有数据之后,后面那个的数据分发程序,它就会读取这个数据,对吧,对这个数据做分发,然后呢,把这个数据分发到不同的那个子topic里面,那我们最后呢,还有一个flume数据落盘层。他呢,就会从那个子topic里面把那些数据呢读出来,最终呢落到hdfs |
1 | 所以下面我们来验证一下。看到没有,这个user active,对这个都是有的。好,这个目录来我们直接查看一下它里面的数据啊。使用管道来取,前一条取一条就行,好里面有数据。那说明是OK的,那其实对应的我可以直接把这个改一下,改成video。看一下他们的数据。也是有了。没有问题,好。那在这我们都可以获取到数据,那就说明了是没有问题的,这就意味着我们前面的整个数据采集流程是通的。那大家在下面做实验的时候呀,我估计啊,可能会遇到各种各样的问题啊,就是大家在操作这一块的时候,如果发现最终啊,看不到我们这个希望的结果。那你就需要一步一步去排查,你要确认一个数据到了哪一步。你先看一下那个大套贝里面有没有数据,然后呢,再看一下那个子套贝壳里面有没有数,如果子套贝格里面也有数据,那最终hps里面没数据,那肯定是你那个数据落盘程序有问题。所以说你需要一点点去分析啊。 |



1 | 针对这个filebeat呀,我再多说一点,这个filebeat它在采集日文件中的数据的时候呢,它会将日文件数据的采集的一个偏移量啊,记录到本地文件里。所以说它在这会记录一下,这样话你filebeat给重启之后,它呢会读取这个文件,然后呢,根据你上一次记录的off继续往下面消费。它可以保证,就算你那个filebeat要停了,那在它停的中间,你往那个日里面记录数据,它后期启动之后还可以把它呢采集过来是这样。注意了,如果说呢,我们想要让这个filebeat的重启之后啊,继续重新开始消费。这样怎么办呢?暴力一点的话,我们可以直接把这个。这个data目录啊,给它删掉,你删掉之后这些信息是不是就没了,没了之后它就认为它是一个新文件,就从头开始读了啊,这个需要注意一下。 |

1 | 还有一点就是我们在使用filebeat的时候啊,如果发现filebeat没有正常工作,这个时候呢,我们需要去查看filebeat的日志文件,来排查具体是什么问题,因为有时候有一些错误信息啊,他不会暴露到这个工作台上面,它会记录到日志文件里面。像few b的下面有一个move。看到没有,它下面有这个这文件,注意你看的话就看这个,它后面这个相当于是一些备份的啊。你直接看这个就行。这是最新的一些日志。这些呢,是之前的一些老的日志。如果说他有一些错误信息在这里面就可以看。你在这儿可能看不到,它不会暴露到这个地方啊,你说呢,我把这个飞票启动了,也没见他报错呀,结果呢,他也没有把数据采集到我的卡夫卡里面,那所以说你就需要来这儿来看啊,看那个日文面。好,那针对服务端日志和这个客端日志的一个数据采集啊,我们先讲到这儿,在这呢,大家主要掌握数据采集的整理思路,重点是里面那个数据聚合,数据分发这两个步骤,那个数据落盘呢,倒没什么特殊的,对吧,咱们之前已经用过很多次。 |
采集服务端数据库数据
1 | 前面呢,我们把这个服务端日志以及客户端日志呢采集过来,下面呢,我们还需要将服务端数据库的数据也采集过来。咱们前面分析了啊,由于历史粉丝关注数据呢,只需要导出一次,所以说呢,没必要使用sqoop,那还有一份数据呢。是那个每日的主播等级数据对吧?所以这份数据呢,我们在最开始的时候会将数据库里面的全量数据导出来一份,后期只需要根据表中的更新字段获取发生了变化的数据即可,这样每天需要导出的数据量就很小了,属于增量采集。这时候呢,可以选择使用sqoop,其实呢也可以选择使用shell脚本,sqoop的使用在上一个项目中我们已经用过了,所以接着呢,我们来讲点不一样的,我来使用shell脚本,将mysql中的数据导出来,所以说么,这两份数据我全部呢使用mysql脚本。把它倒出来。那首先呢,我们使用这个mysql里面那个-e这个命令,将这个具体的查询命令啊准备好。 |
1 | 历史粉丝关注数据 |



1 | 每日主播等级数据 |


1 | 验证成功之后呢,我们就需要把这个命令啊公布到脚本里面,首先是这个用户历史关注数据,注意这份数据啊。它其实呢是分表存取。你看我在这其实只是创建了一部分表,它有很多张。 |

1 | 这个分表逻辑是什么样子的,你看它里面存储的是什么呀,是一个用户的一个UID啊。所以说这样的,它会根据这个用户的UID来计算一个MD5值。你这个MD5值是什么样子的呢?给他搜一下。对吧,你看它的MD5值其实就是这样。就是类似于这么一串。 |

1 | 我们呢,会根据这个用户的UID去计算一个MD5值,然后呢获取最后两位字符,然后呢拼接到这个follow后面。组装一个新的表名。这样话呢,他会算一下你那个UID最后两位是什么,然后呢,存到对应的表里面。这是它一个分表逻辑啊,那你后期查的话呢,也会根据这个UID计算MD5获取最后两位,然后呢来前面呢拼那个follow下划线,后面的话拼上了两位,这样就找到它对应的一个表。 |
extractFollower.sh
1 | 好,下面我们来开发第一个脚本。我现在来写。简单号。它保存一下啊,给它起个名字叫attracted。要抽取follow。是脚本。只需要执行一次即可。你只需要抽取自己的是历史数据,后期的话我们还有一份实时数据,就可以实时维护了。针对1296张表。需要使用这个双层或循环。动态生成表明。I。12345678。所以后面还有呢a。FGH。I z KL mn pqr。ST。UVWXYZ,好吧,这题又是个直接画啊度。这是一层,你里面还要套一层呢。因为我们要批那个表明的最后两个字符嘛,对吧。G。这个我就不是文桥了。50块。好。那现在里面了,我来艾打印一下。就把那个表明啊给他批出来。哎。当然,这行吧。其实你在这儿只要能把这1000多张表的那个表明全都拼出来,那继续把它导出去,那不就很简单了吗?对吧?我先在这呢,先写一个导师的一个脚本。马杠u杠P杠H。18291。藏意后面是circle。这个UIDUID。from。video。零零。注意这样的话就可以获取那些数据了,然后我把这个数据呢,给它重定向。到这个里面这个soft。video recommend。就使用这个表名作为文件里面的前缀log,这样的话其实就可以把这张表数据给它导出来了。你如果把这个东西放到这儿。好把重要啊,我们一会执行不执行,你把里面改一下是不是就行了,你这个东西。还有那个。这东西,然后那个是不是可以了呀。你只要说这个循环执行完,其实就可以把所有数据全都倒出来。我在这通过IO把这个表名导一下就行,行吗。最后的话呢,我们只是把这一个标点数据给它打开就行。好,那接下来呢。先试一下啊,重新在这儿来建一个脚本。video'。到了。OK,这样就可以了,那接下来我们来执行SH。看到没有,前面都打印出来了,没问题啊。那我们来确认一下,这个木下面有没有产生那个零零.log。产生了吧。看一下里面的数据。没问题吧,没问题啊好, |
1 | #!/bin/bash |




extractUserLevel.sh
1 | #!/bin/bash |
1 | 那接下来我们来开发第二个脚本。这个也是抽取数据了。他呢是抽取了一个user level,有这个主播等级的啊。注意在这个脚本中啊,我们需要将数据上传到hdfs上面,并且呢,这个脚本啊,也需要添加到Crontab或者azakaban那个调度器中,每天凌晨00:30了,然后执行一次。就是每天呢抽取一次啊。 |

1 | 下面我们来确认一下。杠,我们来查一下这个date。小level user。有吧?再来查一下这个。没问题吧,有数据啊,那我们来看一下里面内容呗。对吧,这就是我们采集过来的内容。 |


数据计算核心指标详细分析

第一个任务
1 | 大家好,前面我们把项目需要的数据都采集过来了,下面我们开始基于这些数据开发数据计算模块。由于我们需要计算的指标比较多,所以呢,我们先对需要计算的这些核心指标呢进行详细分析。来看一下这张图。我们从左往右分析,首先看左边第一列,这里面表示的是计算程序的数据源,其实就是咱们前面分析的那五种数据。服务端日,客端日加上服务端数据库数据。 |
第二个任务
1 | 那下面呢,我们来看一下第二个任务。这个任务呢,是实时维护follow和UN follow关系,follow呢就是表示关注,UN follow表示取消关注。这个任务是需要一直运行的,他会实时维护粉丝和主播的关注情况。它的数据来源是kafka中的实时粉丝关注数据,这个是通过我们的日志采集程序采集过来的。那通过前面这两个任务呀,就可以将我们全平台的粉丝关注数据进行全量维护了。第一个任务负责历史粉丝关注数据的迁移,第二个任务负责实时维护,这样在neo4j中就维护了全量的粉丝关注数据。 |
第三个任务
1 | 那接下来我们来看一下第三个任务。他需要每天定时更新用户最新活跃时间。这个任务的主要作用呢,是为了维护所有用户的活跃时间。因为最终我们在统计三度关系的时候,如果是针对全平台所有的用户都计算,这样是没有意义的。为什么呢?我们来画一个图,再来分析一下这个三个关系的一个流程。 |

1 | 这个呢,是一个主播,主播一吧。好,这个主播呢,他有一些粉丝。这些粉丝是不是还可能会关注了其他主播呀?所以说在这呢,我们再画几个主播。你看这时候如果说有一个用户。当这个用户呢,你看他去关注这个主播一的时候。其实呢,我们需要把它对应的一个三度关系推荐给这个用户,对吧,其实就是这块这些主播对吧。它们两个之间的一个关系,它到它是两个关系,再到它是三种关系好。注意这里面这个主播以及粉丝呢,其实都属于用户,只是说呢,他的角色不一样吧。对于我们平台而言,所有的主播以及粉丝都属于用户。 |
第四个任务
1 | 好,接下来我们来看一下第四个任务。这个任务啊,主要是在neo4j中维护主播的最新等级信息,因为我们在计算三度关系的时候,还需要对最终计算出来的主播等级进行过滤。如果主播等级太低,那就没必要推荐了。这个任务呢,也是一个离线任务,每天执行一次即可,它的数据呢来源于hdfs,这个数据是我们通过脚本每天定时从服务端数据库中找出来的。 |
第五个任务
1 | 那接下来我们来看一下第五个任务,这个任务呢是每周一计算一次,每周一会到HDPS上面获取最近一个月的视频数据。计算最近一个月内视频评级满足3B加或者是2a加的主播。注意视频等级呢,主要有sabcd这五个等级,S是评级最高的,D是最差的。这里面的3B+表示最近一个月至少要有三次开播,并且最近三次开播视频的评级需要是B级以上,包含B级。那这里面的2a+表示最近一个月至少要有两次开播,并且最近两次开播视频的评级需要是a级以上,包含a级。这两个指标,一个是考虑主播的开播频次比较高,但是呢,视频的评级不是特别高,这样也是可以推荐的,相当于它是一种勤劳的主播。还有一种呢,是主播开播频次比较低,但是呢视频评级特别高,比较优秀,这种呢也可以推荐啊。所以说你只需要满足3B加或者2a加都是可以的。 |
1 | 那最终啊,我们把满足条件的主播计算出来,然后呢到neo4j中进行维护。给主播增加一个flag属性,如果主播最近的开播数据满足3B加或者是2A加,则把这个flag属性置为一。后面在计算三度关系的时候呢,会对主播的这个属性进行过滤,不满足条件呢就不推荐了。那到这为止,前面这五个任务啊,都是对这个neo4j它里面这个数据进行维护的。其中第一个任务呢是初始化数据的是执行一次。 |

第六个任务
1 | 那接下来我们来看一下第六个任务。这个任务呢,是计算满足条件的主播的三度关系推荐列表。在计算的时候会根据前面几个任务的指标作为过滤条件。首先第一点在这里。他俩需要过滤出来最近一周里活跃过的。第二点呢,需要过滤出来主播等级大于四级的。第三点再过滤出来,最近一个月开播视频,满足这个3B加或者2a加这个条件。第四点,再过滤出来推荐列表中粉丝列表关注重合度大于二的。那根据这四点过滤之后呀,最终计算的三个关系列表数据才是我们需要的。 |
第七个任务
1 | 数据计算出来以后,会通过第七个任务把这个数据导出到MYSQ里面。这样就可以通过MYSQL对外提供数据了。这就是数据计算模块中的详细计算流程。下面呢,我把这个数据计算步骤啊做了一个汇总,我们来看一下。 |



1 | 第一步,历史粉丝关注,数据初始化。第二步,实时维护粉丝关注数据。第三步,每天定时更新主播等级。第四步,每天定时更新用户活跃时间。第五步,每周一计算最近一个月主播视频评级信息。第六步,每周一计算最近一周内主活主播的三组关系列表。第七步,三种关系列表数据导出到马,这个就是具体的我们的计算步骤。 |

数据计算之历史粉丝关注数据初始化(第一个任务)
1 | 下面呢,我们首先来看一下数据计算中的第一步。历史粉丝关注出于初始化。咱们前面分析过啊,历史粉丝关的数据呢,来源于服务端数据库,并且呢,这些数据在存储的时候还是分表存储,一共1296张。这个表里的数据格式呢是一样的。fuid、uid,还有一个time stamp。这个fuid呢,表示关注者,其实呢就是粉丝。这个UID表示被关注者的UID,其实就是主播。这份数据呢,在前面开发数据采集模块的时候,我们已经生成过。那接下来我们就要把这个数据啊,初始化到neo4j中。那我们需要把之前生成的这个文件呢,上传到neo4j的一个import下面才可以使用。 |

1 | 我们之前导出的这个粉丝关注数据在这就这个文件,那我们需要把这个文件给它复制到import目下面。嗯了,好,这样就可以了。那下面呢,我们在初始化这个数据之前啊,我们把那个neo4j重新初始化一下,就把里面数据全给它删掉。因为之前里面有一些特殊数据。把那个data目录删掉。 |


1 | 停一下。嗯。在其中。好,这样的话就可以了,注意因为你在这把那个data目录删了之后啊,你再重新启动之后呢,我们需要去修改一下密码。 |

1 | 那接下来我们需要到这儿来初始化数据了。我们在这先连到这个bin/cypher-shell这里面。bin/cypher-shell -a bolt://bigdata04:7687 -u neo4j -p admin。 |

1 | 好,那这里面呢,我们首先要针对这个关键字段建立索引,对吧。咱们前面是不是已经做过这种操作呀。所以下面呢,我们还需要执行一个这个。好,执行成功,那我们到界面上来确认一下数据。好,是可以的。 |


1 | 注意,我们这儿呢,只初始化一个数据文件。那你说真的,我们实际工作中啊,我们有1000多个这种数据文件,你需要让我在这执行这个命令,执行1000多次吗?这肯定是不合理的。那如何一次性批量导入呢?可以这样来做啊。我们可以把这多个文件合并成一个大文件,对吧,你在Linux命行里面,你看这个文件,然后呢,通过这个右箭头>>重定向。把这个1000多个文件里面那个数据啊,整合到一个大文件里,这就可以了。好,那这样的话呢,针对数据计算的第一步初始化数据,这个我们就搞定了。 |