第十八周 直播平台三度关系推荐v1.0-3
数据计算之实时维护粉丝关注(第二个任务)

1 | 接下来我们来看一下数据计算中的第二步,实时维护粉丝关注数据。我们的实时粉丝关注数据呢,来源于服务端日志,因为当用户在直播平台中对主播进行关注和取消关注的时候呢,会调用服务端接口。所以说服务端会记录这些操作日志。具体的数据格式呢,是这样。这是一个json格式,fuid就代表了粉丝。uid代表的是主播。好,这个timestamp,它表示这个具体你这个关注行为,或者你取消关注行为,它产生的时间。这个type呢,表示这个数据是什么类型的数据,它是粉丝关注相关的数据。那具体这条数据是关注还是取消关注,我们要根据这个desc这个参数来定。它里面这个值如果是follow就表示是关注,如果是UN follow,就表示取消关注。所以说后期我们在解析这个数据的时候呢,其实核心字段就是三个followeruid,还有这个followuid,还有这个desc。那接下来我们就需要使用sparkstreamming实时维护neo4j粉丝关注的相关数据。 |
父项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
1 | 以及添加一个repository,因为neo4j-spark-connector这个依赖在maven中央仓库中是没有的。 |
1 | 注意我在这个父项目创建module子项目,副项目这个pom文件里面啊,提前把相关的依赖啊都给它放进,我们可以看一下,主要是下面这些东西。这是Spark相关的依赖,Spark core、spark sql、 Spark streaming。还有这个spark streaming kafka对吧,因为你要读取kafka嘛。以及下面呢,是你后这相关的一些依赖。现在我提前把它拿过来,后面我们再具体用到的时候呢,我再去分析。 |


子项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
1 | 那针对我们这个实施项目,它里面都需要什么依赖呢?注意它里面呢,你首先需要这个spark streaming以及spark streaming kafka吧。以及用neo4j那个依赖。还有一个,因为我们的原始数据是json的,我们要解析json,所以说呢,还要用那个fastjson这个包。那所以说我们看一下啊。neo4j-java-driver就类似于我们要操作mysql一样,我们要添加mysql对应的一个connector。啊,这个是类似的啊。 |
1 | 那接下来建一个包。com。点五克点Spark。在这里面,我们创建一个object。叫real time。follow。那在这我们先写点注释,这个呢是任务二了,就第二个任务。实施维护粉丝关注数据。你妈吗?对,在这我们一个streaming。context。它里面呢,需要接触一个SPA以及一个时间second。second。我们把这个周期呢设为五秒行吧。上面呢,我们来创建,你有一个Spark,这个都是固定的写法。said master。LOGO2。因为你是一个实时处理程序。set APP name。就是这个。提取的变量。好。好,这个呢,也提取一个变量有SSC吧。这个呢表示创建。context。 |
1 | package com.imooc.spark |
RealTimeFollowScala.scala
1 | 那下面我们就开始处理试卷。那你这边注意,你首先将这个卡不卡呀,转一下。转换为RDD。然后呢,就可以。就要用RDD中的这个for each part。好,第二和一起,阿D先把它转换成阿里D。这样这里面传过来其实就是阿里列。转成R之后,我们就可以调用它里面这个for each了,那这样的话就可以一次处理一批的数据。比如说一次处理一个分区的数。为什么要这样做呢?因为我们在这里面需要去连neo4j。我们要把这个具体用户实时的这个关注,以及取消关注这些关系呢,给他维护到neo4j里面,所以说在这呢,相当于我们需要获取用户这些链接,然后呢去操作它。 |
1 | 那我们接着来判断一下。如果呀,这个。follow the equal。这个full time。如果说你这个值等于follow,那相当于就是关注。else。就是取消关注了。添加关注。这是取消关注。注意你说这种写法。和这样写,你说我把它放到前面。这里面呢,写那个follow有什么区别吗?注意你这种写法,如果说。他呢,是空是闹,你这样调用会报了一个空,人异常啊。但我把它放到前面,那肯定不会报空人了啊,这肯定是有值的。你就算你那个pro等于no,这个顶多是不相等,它也不会出现这个控制异常啊,这个需要注意一下。下面呢,我们来执行添加关注操作。这需要注意。因为呢,它这里面啊,会涉及多条命令。所以说呢,我们需要使用事物。类似于我们先需要添加两个节点,然后呢,再给这两个基点绑定一个关注关系。这样的话就是三条命令。这三条命令在执行的时候,要么都成功,要么都失败。好,那所以说在这我们需要这样来做,通过session。第二。right transaction。开启一个事物。你要传一个,你有一个栓死action。walk。注意这块这个泛型啊,表示你这里面这个代码啊,最终的返回的数据类型在这我们不需要返回东西,所以说呢。把要空就行了。实现里面微信的方法。就这个。好,那这里面就需要实现我们具体的逻辑了。注意。它这里面相当有一个数啊transaction,所以说我们直接调TS点。你看它里面传的是一个query string,其实就是具体的一个执行的一个语句。墨。user。ID。这是一个字符串,所以说要加个单引号啊。那我们在这获取这个UID。那下面还有一个。这是follow u ID对吧。先把这两个节点给它创建了。那下面我们要给它们两个绑定关系。注意你在这绑定关系的话,这个相当于它是分开执行的啊,只不过说他们在一个事物里面,所以你在这啊,先添加这两个下面再来查。match。把你刚才添加这两个节点查出来,再给它们两个绑定关系。user。ad。这是这个UID,我们先把这个查出来,然后呢,再把第二个。给它起个名字叫B。user。这是UID。ID对吧,这样的话,你看其实我们就把这两个节点个查出来了。你前面是添加在这,把他们两个都查出来,这查第一个,这是查第二个。并且呢,给每一个都起了一个别名,它叫a,它叫B,对吧。那后面呢,就可以使用这个。墨。a。好。这样就可以了。在这x.com提交15。那注意,如果说他在执行的时候失败了,这样的话,我们需要让这个事物回滚。catch。直接捕获这个最大的异常。只要抛一场,我们就调用它的一个back。这样就可以了。所以这里面啊,执行的还是咱们前面讲那个step语句对吧。OK,这是添加关注。那取消关注就简单了呀。取消关注,其实呢,一条命令就可以了。相当于呢,我使用match把它查出来,然后后边的话使用一个DD的,那这样的话,其实呢,你就不需要开启这种事物了,因为你相当于取消关注,只有一条命令,就不需要开启这个事故了啊,然后自动提交就行。所以说我们直接使用这个session点。软。match。user。UID。说了UID。然后面他呢。follow。这个哥们在。UID。follow u ID。注意这时候呢,给这个关系呢,起一个名字叫啊。所以说在最后面直接把这个关系给它删掉就行。这个意思啊。这个呢,就是取消关注,取消他们两个之间的佛的关系。这样的话其实就可以了啊。我们这里面的一个核心代码就搞定了。最后是一个启动任务。start。应该在这里面。这个是等待任务停止。嗯。嗯。好,这样就可以了。这样的话,我们这个代码啊,其实就开发好了。 |
1 | 好,那接下来呢,我们就来模拟产生一些数据。找到这个generate date这个项目对吧。我们调里面这个generate real time执行这个,它每执行一次都会产生一条这个实时的一个粉丝关注或者取消关注的数据。执行一下。 |

1 | 是一个follow。你看这个2009FOLLOW了1005,那我们来看一下。在这刷新一下,或者你重新点那个follow,或者你点那个刷新都可以。看到没有2009FOLLOW了1005没问题吧。我们之前是没有这个的啊,之前它是没有这个follow关系的。 |

1 | OK,所以说呢,我们这个代码呢,正常执行了。 |

1 | 举个例子: |
1 | 那接下来呢,我们来对这个代码啊,再给它完善一下。可以让它呢,同时支持在本地运行和集群运行。把这个给它停掉。 |
1 | package com.imooc.spark |
1 | 注意:建议这个项目中的所有依赖包全部在spark-submit脚本后面的–jars中指定(neo4j-java-driver、fastjson、spark-streaming-kafka这三个需要手动指定,其它的spark安装包已有),这样最终生成的任_务jar就比较小了,提交任务的时候速度会比较快。所以这里面的jar-with-dependencies插件就可以不使用了,因为我们打jar包的时候不需要把依赖打进去,这个时候也不需要在依赖中添加provided参数了。 |


1 | 为了方便的提交任务,我们再开发一个任务提交脚本, |
startRealTimeFollow.sh
1 | #!/bin/bash |
1 | jar包路径可以是本地,也可以是hdfs上,建议hdfs |




1 | 重新生成一条实时关注数据,查看结果 |

1 | 停止sparkStreaming任务 |
1 | #!/bin/bash |
1 | sh -x startRealTimeFollow.sh |
1 | 此时你会发现任务执行失败了 |
1 | 所以我们在脚本中再添加这个kafka-client的依赖 |



1 | 其实根源是在这,主要是缺少这个类org.reactivestreams.Publisher,最终导致的org.neo4j.driver.Config无法初始化。 |
1 | 到此为止,把这个jar包再加进去就可以正常执行了,这就是我们排查问题的一个思路和流程,这个思路大家一定要学以致用。 |
数据计算之每天定时更新主播等级(第三个任务)
1 | 主播等级数据来源于服务端数据库(定时增量导入到HDFS中) |

1 | 注意:表中有两个等级字段,一个是用户等级,一个是主播等级 |
1 | 这个任务需要做的是把每天主播等级发生了变化的数据更新到neo4j中,在neo4j中也维护一份主播的等级 |
所需依赖
1 | <dependency> |
UpdateUserLevelScala.scala
1 | 在update_user_level下面的scala里面创建包:com.imooc.spark |
1 | package com.imooc.spark |
本地执行
1 | 在本地执行代码 |

startUpdateUserLevel.sh
1 | 开发任务执行脚本 |
1 | !/bin/bash |



打包配置
1 | <build> |
子项目完整pom
1 | <?xml version="1.0" encoding="UTF-8"?> |

集群执行
1 | sh -x startUpdateUserLevel.sh 20260201 |


数据计算之每天定时更新用户活跃时间(第四个任务)
1 | 数据来源于客户端上报,每天只要打开过APP就会上报数据 |

1 | 之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录 |
生成数据
1 | 执行代码:GenerateUserActiveDataV2,将会把数据上传到hdfs的这个目录下 |


1 | 这个任务需要做的是把每天主动活跃的用户更新到neo4j中,在neo4j中维护一份用户的最新活跃时间 |
所需依赖
1 | <dependency> |
UpdateUserActiveScala.scala
1 | package com.imooc.spark |
本地执行

startUpdateUserActive.sh
1 | !/bin/bash |
打包配置
1 | <build> |
子项目完整pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
集群执行
1 | sh -x startUpdateUserActive.sh 20260201 |


数据计算之每周一计算最近一个月主播视频评级-1(第五个任务)
1 | 视频数据来源于服务端,当主播开播结束后会产生一条视频数据 |

1 | 之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录 |
生成数据

1 | 这个任务需要做的就是统计最近一个月内主播的视频评级信息 |
所需依赖
1 | <dependency> |
UpdateVideoInfoScala.scala
1 | package com.imooc.spark |
本地执行
1 | 在本地执行代码 |


startUpdateVideoInfo.sh
1 | 下面开发任务执行脚本 |
1 | !/bin/bash |
子项目完整依赖
1 | <?xml version="1.0" encoding="UTF-8"?> |
集群执行
1 | sh -x startUpdateVideoInfo.sh 20260201 |
数据计算之每周一计算三度关系推荐列数据(第六个任务)
1 | 前面我们在neo4j中维护了粉丝和主播的一些信息,在这里我们就需要基于neo4j中的数据统计主播的三度关系推荐列表了 |
生成数据
所需依赖
1 | <dependency> |
1 | 注意:在使用spark读取neo4j中数据的时候,可以使用一个插件,在官网中可以找到 |



1 | 这个版本是基于neo4j 4.0,我们现在使用的neo4j是3.5的,这种一般是向下兼容的,所以操作neo4j 3.5也是可以的,后面写的spark是2.4.5,这个也是可以的,我们使用的spark是2.4.3的,最后一位版本号不一致没问题。 |
1 | 注意:在使用这个依赖的时候,还需要配置它对应的repository,因为这个依赖没有在maven仓库中,把这些配置添加到父项目的pom.xml文件中 |


1 | 咱们前面使用的neo4j-java-driver相当于是使用原生代码操作neo4j,而现在使用neo4j-spark-connector相当于把neo4j封装到spark中了,使用起来比较方便。 |
GetRecommendListScala.scala
1 | package com.imooc.spark |
本地执行
1 | 先使用这一行代码,在计算三度关系数据的时候暂时先不进行条件过滤。 |


1 | 接着使用这一行代码,在计算三度关系数据的对数据进行条件过滤 |


1 | 到neo4j中验证一下,确实是正确的,因为1005的flag不为1,被过滤掉了,所以我在关注1000这个主播的时候平台只需要给我推荐主1004这个主播即可。 |
1 | 注意:这个代码在执行mapPartitions的时候,最好把rowRDD的分区重新设置一下,如果让程序自动分配的话可能不太合理,分多了分少了都不太好 |
1 | 由于我们在mapPartitions中需要操作neo4j,所以这个时候rowRDD分区的数量就可以等于(neo4j服务器的CPU数量-1),要给neo4j预留出来一个cpu来处理其它任务请求。 |
1 | 还有一点可以优化的,增加RDD持久化,把RDD数据缓存起来,这样可以避免个别task失败导致的数据重算,因为这个计算还是比较消耗时间的,所以说尽可能保证计算出来的数据不丢失。 |

1 | 问题:大家有没有想过,我们是否可以直接在Neo4j(sc).cypher(…)中指定一条查询语句,直接把所有的三度关系全部查询出来? |
1 | 这样理论上是可以的,但是在实际中,当neo4j中存储的节点数和关系数量达到千万级别之后,同时查询所有满足条件主播的三度关系推荐列表的时候会很慢,有时候会导致等了几十分钟也查询不出来数据,所以在这我们就把这个功能进行了拆解,先查看满足条件的主播uid列表,然后再一个一个计算这些主播的三度关系推荐列表,这样可以提高计算效率,并且不会出现查询不出来结果的情况。 |
startGetRecommendList.sh
1 | !/bin/bash |
子项目完整依赖
1 | <?xml version="1.0" encoding="UTF-8"?> |
集群执行
1 | sh -x startGetRecommendList.sh 20260201 |

数据计算之三度关系数据导出到Mysql(第七个任务)
1 | 接下来我们需要使用Sqoop将HDFS中计算好的三度关系推荐列表数据导出到MySQL中 |
recommend_list.sql
1 | drop table if exists recommend_list; |
export_recommend_list.sh
1 | 接下来使用sqoop将hdfs中的输出导出到mysql中 |
1 | #默认获取上周一的时间 |
1 | sh -x exexport_recommend_list.sh 20260201 |

1 | 其实在实际工作中我们需要做的到这就可以了,然后把这个数据库的名称、表名、表中的字段含义写一个文档同步给服务端即可,具体的数据交互是由服务端和客户端进行对接的。 |
数据展现
1 | 在后面的v2.0架构中,我们会开发一个接口,对外提供数据,因为直接把数据库暴露给其它用户不太安全,倒不是怕他们删库跑路,是担心他们误操作把某些数据删掉了。 |
项目代码双语支持
1 | 咱们前面在开发具体的数据计算代码的时候,使用的都是scala代码,为了兼顾Java开发人员,针对数据处理中的功能代码在这我也提供了Java代码支持 |
1 | 注意: |