大数据开发工程师-第十八周-直播平台三度关系推荐v1-0-3


第十八周 直播平台三度关系推荐v1.0-3

数据计算之实时维护粉丝关注(第二个任务)

image-20230426121205538

1
接下来我们来看一下数据计算中的第二步,实时维护粉丝关注数据。我们的实时粉丝关注数据呢,来源于服务端日志,因为当用户在直播平台中对主播进行关注和取消关注的时候呢,会调用服务端接口。所以说服务端会记录这些操作日志。具体的数据格式呢,是这样。这是一个json格式,fuid就代表了粉丝。uid代表的是主播。好,这个timestamp,它表示这个具体你这个关注行为,或者你取消关注行为,它产生的时间。这个type呢,表示这个数据是什么类型的数据,它是粉丝关注相关的数据。那具体这条数据是关注还是取消关注,我们要根据这个desc这个参数来定。它里面这个值如果是follow就表示是关注,如果是UN follow,就表示取消关注。所以说后期我们在解析这个数据的时候呢,其实核心字段就是三个followeruid,还有这个followuid,还有这个desc。那接下来我们就需要使用sparkstreamming实时维护neo4j粉丝关注的相关数据。

父项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>db_video_recommend</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>generate_data</module>
<module>data_collect</module>
<module>server_inter</module>
<module>real_time_follow</module>
<module>update_user_level</module>
<module>update_user_active</module>
<module>update_video_info</module>
<module>get_recommend_list</module>
</modules>
<dependencyManagement>
<dependencies>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
<!-- spark相关依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- neo4j相关依赖 -->
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.4.5-M1</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
</project>
1
以及添加一个repository,因为neo4j-spark-connector这个依赖在maven中央仓库中是没有的。
1
注意我在这个父项目创建module子项目,副项目这个pom文件里面啊,提前把相关的依赖啊都给它放进,我们可以看一下,主要是下面这些东西。这是Spark相关的依赖,Spark core、spark sql、 Spark streaming。还有这个spark streaming kafka对吧,因为你要读取kafka嘛。以及下面呢,是你后这相关的一些依赖。现在我提前把它拿过来,后面我们再具体用到的时候呢,我再去分析。

image-20230426121552111

image-20230426121614568

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>real_time_follow</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
那针对我们这个实施项目,它里面都需要什么依赖呢?注意它里面呢,你首先需要这个spark streaming以及spark streaming kafka吧。以及用neo4j那个依赖。还有一个,因为我们的原始数据是json的,我们要解析json,所以说呢,还要用那个fastjson这个包。那所以说我们看一下啊。neo4j-java-driver就类似于我们要操作mysql一样,我们要添加mysql对应的一个connector。啊,这个是类似的啊。

在这呢,你可以把这个版本号给它删掉就行。因为在这啊,我们是在这个父项目里面统一来维护管理这些依赖,其实主要是管理这些版本。因为这个最外层呢,你看它套了一个dependency manager里面呢是一个dependency。所以说你看和这个比的话,它外面是多了一个这个depend manager。所以说呢,在这统一管理这些依赖的版本,你后期这里面这些子项目,你需要用到哪些依赖,你把这些依赖拿过来就可以使用了,这样的话你不需要指定版本,它呢会读取你那个父项目里面pom里面指定的这个版本。如果说你有个性化的需求,你说你那个公共的版本不满足我的要求,我想使用某一个特殊的版本,那么你可以在这来指定你的版本就可以了。这样的话只针对你这个子项目有效。那把这几个依赖加过来就可以了,现在我们来看看这个啊,前面这几个就没什么好说的了,主要看看这个neo4j-java-driver。就类似我们操作mysql一样,我们需要找到neo4j它的一个驱动jar包给大家分析一下我是在哪找到的?官网,这个其实啊,我们直接到这个maven仓库里面去搜也行,你到仓库里面去搜这个new瑞杠Java杠driver也是可以找到的,是一样的。这是这个流程。

好,那接下来呢,我们把这个项目它的一个技术环境,再给它配一下邮件。摩托塞定是。这样里面注意咱那个library,我们把这个scala2.11这个SDK给它加进来。因为针对Spark而言,我们用的是2.11。你在这呢,我们再建一个录入叫SC。把它设置为S。OK,注意前面我们在开发代码的时候,我们先用代码来开发。等最后的时候呢,我会把那个java代码具体的实现呢,也给大家提供过去。我们课上讲的时候以这个scala代码为主。
1
2
3
那接下来建一个包。com。点五克点Spark。在这里面,我们创建一个object。叫real time。follow。那在这我们先写点注释,这个呢是任务二了,就第二个任务。实施维护粉丝关注数据。你妈吗?对,在这我们一个streaming。context。它里面呢,需要接触一个SPA以及一个时间second。second。我们把这个周期呢设为五秒行吧。上面呢,我们来创建,你有一个Spark,这个都是固定的写法。said master。LOGO2。因为你是一个实时处理程序。set APP name。就是这个。提取的变量。好。好,这个呢,也提取一个变量有SSC吧。这个呢表示创建。context。

那接下来呢,我们需要获取。消费卡夫卡的数据流。这个呢,也是一个固定的写法。搞不搞?第二数一。我们使用那个direct。泛型呢,都是spring这个我们前面讲过啊嗯。把这个SSC传过去。后面呢,穿这个location。street。这个后面呢,直接那个五月份。好,它下面还有一个参数叫consumer。一个订阅功能啊。这块泛型呢,也都是three。sweets。注意这里面我们需要给它指定两个参数,一个呢是topics对吧,指定topic。还有一个呢,是卡夫卡的一个参数。需要这两个参数。那我们这上面需要重建一下。嗯。在这呢,指定要读取的topic的名称。主要是两个。那我们先创建第一个。他其实就是这个。我们直接使用个map。object。直接在这块来处理啊。首先在里面使用卡夫卡的一个扑克地址。RI。101冒号9092。逗号。0203把这个改一下,0203。好,这样第一个参数就搞定了。我们还需要指定一下这个K的一个序列化类型。前面先空着,我先把后面那个写,写完之后呢,复制一下。嗯。glass of string。这样就可以了。那前面的话呢,其实就是K点把这个拿过来。he ser。小写。这个呢是value的序列化类型,嗯。把这个复制一下,再改一下。好,这样就可以了,嗯。那下面来指定这个消费者组ID。嗯嗯。给我点ID。con吧,行吧。这个就是你来自己来指定。下面是一个消费策略。就你第一次消费的时候如何来消费?auto ofetet。就是读取最新的数据。这个是自动提交offset。嗯嗯。enable。there auto。这个是醋啊。现在啊,你最好给他强制指定类型,要不然用的时候呢,可能会有问题。加点allow their words。对吧,这个就可以了,那既然我们指定我们需要消费的这个topic。它是一个,它可以接收多个。注意,接着我们应该消费那个user。对吧,这里面是实时的粉丝关注相关的数据。OK,那这块呢就可以了,我们就可以获取到这个消费卡不卡的一个数据流了。这个是一个卡夫卡。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.imooc.spark

import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase, Transaction, TransactionWork}

/**
* 任务2:
* 实时维护粉丝关注数据
* Created by xuwei
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
//创建StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("RealTimeFollowScala")
val ssc = new StreamingContext(conf, Seconds(5))

//指定Kafka的配置信息
val kafkaParams = Map[String,Object](
//kafka的broker地址信息
"bootstrap.servers"->"bigdata01:9092,bigdata02:9092,bigdata03:9092",
//key的序列化类型
"key.deserializer"->classOf[StringDeserializer],
//value的序列化类型
"value.deserializer"->classOf[StringDeserializer],
//消费者组id
"group.id"->"con_1",
//消费策略
"auto.offset.reset"->"latest",
//自动提交offset
"enable.auto.commit"->(true: java.lang.Boolean)
)
//指定要读取的topic的名称
val topics = Array("user_follow")

//获取消费kafka的数据流
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

//处理数据
//首先将kafkaDStream转换为rdd,然后就可以调用rdd中的foreachPartition了
kafkaDStream.foreachRDD(rdd=>{
//一次处理一个分区的数据
rdd.foreachPartition(it=>{
//获取neo4j连接
val driver = GraphDatabase.driver("bolt://bigdata04:7687", AuthTokens.basic("neo4j", "admin"))
//开启一个会话
val session = driver.session()
it.foreach(record=>{
//获取粉丝关注相关数据
//{"followeruid":"1001","followuid":"1002","timestamp":1798198304,"type":"user_follow","desc":"follow"}
val line = record.value()
//解析数据
val userFollowObj = JSON.parseObject(line)
//获取数据类型,关注 or 取消关注
val followType = userFollowObj.getString("desc")
//获取followeruid
val followeruid = userFollowObj.getString("followeruid")
//获取followuid
val followuid = userFollowObj.getString("followuid")

if("follow".equals(followType)){
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit] (){
override def execute(tx: Transaction): Unit = {
try{
tx.run("merge (:User {uid:'"+followeruid+"'})")
tx.run("merge (:User {uid:'"+followuid+"'})")
tx.run("match(a:User {uid:'"+followeruid+"'}),(b:User {uid:'"+followuid+"'}) merge (a) -[:follow]-> (b)")
//提交事务
tx.commit()
}catch {
case ex: Exception => tx.rollback()
}
}
})
}else{
//取消关注
session.run("match(:User {uid: '"+followeruid+"'}) -[r:follow]-> (:User {uid: '"+followuid+"'}) delete r")
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
})

//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}

RealTimeFollowScala.scala

1
2
3
那下面我们就开始处理试卷。那你这边注意,你首先将这个卡不卡呀,转一下。转换为RDD。然后呢,就可以。就要用RDD中的这个for each part。好,第二和一起,阿D先把它转换成阿里D。这样这里面传过来其实就是阿里列。转成R之后,我们就可以调用它里面这个for each了,那这样的话就可以一次处理一批的数据。比如说一次处理一个分区的数。为什么要这样做呢?因为我们在这里面需要去连neo4j。我们要把这个具体用户实时的这个关注,以及取消关注这些关系呢,给他维护到neo4j里面,所以说在这呢,相当于我们需要获取用户这些链接,然后呢去操作它。

那这样的话,你最好是一批获取一次链接,这样效率比较高对吧。我每一条数据过来都获取链接,这样效率比较低。因为本身你这个实时其实就是一小批一小批的数据,那针对这一小批里面的数据,你可以再按分区对吧,每个分区获取一次链接。啊,for。这是一个AI,嗯。好,那在这里面。我们就可以调用那个it点,不好意思就可以迭代它里面的每一条数据了,对吧。这是一个了吧。那现在注意,我们需要先获取u或这个链接。怎么获取呢?因为我们提前已经把那个六后力杠Java杠driver那个依赖已经引进去了,所以说你在这儿可以直接用。哇。第二。我们使用这个。这个前面呢,是那个具体的URLURL其实就是那个开头的那个。这个零四冒号7687。后面的话我们需要指定下这个用户名和密码,就这个奥。basic。嗯,指定一下用户名。你徒弟还有密码。嗯。好,这样的话,下面我们就获取到这个用户的一个链接了。那下面的话,我们需要使用这个链接呢,开启一个会话。开启绘画之后才可以去用啊。这个。那这样的话,在里面就可以使用这个session。那你在最后啊,用完之后啊,需要把它们两个给它关闭掉,先把这个写一下,关闭会话。session。close。嗯。关闭连接。there are the clothes。好,那下面呢,我们来把里面这个代码给它完善一下,这里面是核心的代码。那在这里面,我们那就可以获取这个粉丝关注相关的数据。它其实呢,就是那个阶层串出了一块阶层数据。在这了。men。这样的话就可以获取到这行数据了。把这行计算数据获取到,获取到之后注意下面呢,我们来解析数据。用杰森。yeah。us objects。嗯,把它传过去。这个呢,其实就是user。follow。好,那接下来我们就从它里面去获取一些数据。获取数据类型。到底是?关注哦,取消关注。你又使用那个user点,咱们刚才分析了,它里面有一个DC这个词对吧。follow types。那接下来我们还需要获取这个follow u ID。嗯。thatw。嗯。W。what ID?接下来获取了一个follow u ID。follow u ID。嗯嗯。好,这三个核心的这个字段呢,我们都获取到。那获取到之后,下面的话,我们其实就可以根据这个photo这个类型来执行具体关注或者是取消关注这个操作。
1
2
3
4
5
6
那我们接着来判断一下。如果呀,这个。follow the equal。这个full time。如果说你这个值等于follow,那相当于就是关注。else。就是取消关注了。添加关注。这是取消关注。注意你说这种写法。和这样写,你说我把它放到前面。这里面呢,写那个follow有什么区别吗?注意你这种写法,如果说。他呢,是空是闹,你这样调用会报了一个空,人异常啊。但我把它放到前面,那肯定不会报空人了啊,这肯定是有值的。你就算你那个pro等于no,这个顶多是不相等,它也不会出现这个控制异常啊,这个需要注意一下。下面呢,我们来执行添加关注操作。这需要注意。因为呢,它这里面啊,会涉及多条命令。所以说呢,我们需要使用事物。类似于我们先需要添加两个节点,然后呢,再给这两个基点绑定一个关注关系。这样的话就是三条命令。这三条命令在执行的时候,要么都成功,要么都失败。好,那所以说在这我们需要这样来做,通过session。第二。right transaction。开启一个事物。你要传一个,你有一个栓死action。walk。注意这块这个泛型啊,表示你这里面这个代码啊,最终的返回的数据类型在这我们不需要返回东西,所以说呢。把要空就行了。实现里面微信的方法。就这个。好,那这里面就需要实现我们具体的逻辑了。注意。它这里面相当有一个数啊transaction,所以说我们直接调TS点。你看它里面传的是一个query string,其实就是具体的一个执行的一个语句。墨。user。ID。这是一个字符串,所以说要加个单引号啊。那我们在这获取这个UID。那下面还有一个。这是follow u ID对吧。先把这两个节点给它创建了。那下面我们要给它们两个绑定关系。注意你在这绑定关系的话,这个相当于它是分开执行的啊,只不过说他们在一个事物里面,所以你在这啊,先添加这两个下面再来查。match。把你刚才添加这两个节点查出来,再给它们两个绑定关系。user。ad。这是这个UID,我们先把这个查出来,然后呢,再把第二个。给它起个名字叫B。user。这是UID。ID对吧,这样的话,你看其实我们就把这两个节点个查出来了。你前面是添加在这,把他们两个都查出来,这查第一个,这是查第二个。并且呢,给每一个都起了一个别名,它叫a,它叫B,对吧。那后面呢,就可以使用这个。墨。a。好。这样就可以了。在这x.com提交15。那注意,如果说他在执行的时候失败了,这样的话,我们需要让这个事物回滚。catch。直接捕获这个最大的异常。只要抛一场,我们就调用它的一个back。这样就可以了。所以这里面啊,执行的还是咱们前面讲那个step语句对吧。OK,这是添加关注。那取消关注就简单了呀。取消关注,其实呢,一条命令就可以了。相当于呢,我使用match把它查出来,然后后边的话使用一个DD的,那这样的话,其实呢,你就不需要开启这种事物了,因为你相当于取消关注,只有一条命令,就不需要开启这个事故了啊,然后自动提交就行。所以说我们直接使用这个session点。软。match。user。UID。说了UID。然后面他呢。follow。这个哥们在。UID。follow u ID。注意这时候呢,给这个关系呢,起一个名字叫啊。所以说在最后面直接把这个关系给它删掉就行。这个意思啊。这个呢,就是取消关注,取消他们两个之间的佛的关系。这样的话其实就可以了啊。我们这里面的一个核心代码就搞定了。最后是一个启动任务。start。应该在这里面。这个是等待任务停止。嗯。嗯。好,这样就可以了。这样的话,我们这个代码啊,其实就开发好了。


那接下来我们需要确保卡夫卡服务cub的采集程序、数据分发程序以及服务端接口服务正常运行。那这样的话,我们就可以在本地去运行这个Spark人命程序了。好在这样我们把它启动起来。先在本地去执行一下。等这个程序启动之后呢,我们再去调那个接口,模拟产生这个用户实时的关注和取消关注数据。你先确认你这个人命程序是正常的

看到没有,它这个程序不停就说明了它是OK的。
1
好,那接下来呢,我们就来模拟产生一些数据。找到这个generate date这个项目对吧。我们调里面这个generate real time执行这个,它每执行一次都会产生一条这个实时的一个粉丝关注或者取消关注的数据。执行一下。

image-20230426163505984

1
是一个follow。你看这个2009FOLLOW了1005,那我们来看一下。在这刷新一下,或者你重新点那个follow,或者你点那个刷新都可以。看到没有2009FOLLOW了1005没问题吧。我们之前是没有这个的啊,之前它是没有这个follow关系的。

image-20230426163612378

1
2
3
OK,所以说呢,我们这个代码呢,正常执行了。

但是注意了,目前这个程序啊,其实是存在一些问题的,因为数据通过filebeat的采集,再到kafka,最终我们消费的数据顺序和数据产生的顺序可能就不一致了。

image-20230426164158476

1
2
3
4
5
6
7
8
9
10
11
举个例子:
用户A先关注了用户B
用户A很快又取关了用户B
这样就会产生两条日志数据,这两条数据经过采集分发之后,最后我们消费过来的数据顺序很有可能是这样的
用户A取关用户B
用户A关注了用户B
这种最终的结果就是用户A关注了用户B,那这样就不准确了,虽然这种情况是小概率事件,但是也是存在的,在SparkStreaming中如何解决呢?
由于SparkStreaming是一小批一小批处理的,所以我们可以针对每次获取的这一小批数据根据数据产生的时间戳进行排序,从小到大,然后按照这个顺序去操作这些数据,这样其实就能在很大程度上避免我们刚才分析的这种问题了,但是这样并没有完全解决掉这个问题,如果两条数据分到了两批数据里面,还是会存在这个问题的,不过这种情况出现的概率就很低了,我们暂时就忽略不计了。
在这我把这个思路分析好了,给大家留一个作业,大家下去之后自己尝试动手实现一下这个功能

使用sparkstreaming解决数据乱序的问题。因为我们的原始数据里面,里面有个FUID,还有UID,还有一个时间戳,对吧,那现在在sparkstreaming里面,你获取到这一批数据之后,你对这一批数据按照time stamp进行排序,从小到大。排完序之后呢,再去执行这些操作。这样就可以了。大家呢,先自己动手做一下啊。
1
2
3
那接下来呢,我们来对这个代码啊,再给它完善一下。可以让它呢,同时支持在本地运行和集群运行。把这个给它停掉。

这里面啊,可能会发生变化的这些信息呢,全部都给它提取出来。让它支持动态传递啊,这样的话,我们这个代码呢,更会更加通用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.imooc.spark

import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase, Transaction, TransactionWork}

/**
* 任务2:
* 实时维护粉丝关注数据
* Created by xuwei
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local[2]"
var appName = "RealTimeFollowScala"
var seconds = 5
var kafkaBrokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"
var groupId = "con_1"
var topic = "user_follow"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
seconds = args(2).toInt
kafkaBrokers = args(3)
groupId = args(4)
topic = args(5)
boltUrl = args(6)
username = args(7)
password = args(8)
}

//创建StreamingContext
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val ssc = new StreamingContext(conf, Seconds(seconds))

//指定kafka的配置信息
val kafkaParams = Map[String,Object](
//kafka的broker地址信息
"bootstrap.servers"->kafkaBrokers,
//key的序列化类型
"key.deserializer" ->classOf[StringDeserializer],
//value的序列化类型
"value.deserializer" ->classOf[StringDeserializer],
//消费者组id
"group.id" -> groupId,
//消费策略
"auto.offset.reset" -> "latest",
//自动提交offset
"enable.auto.commit" -> (true: java.lang.Boolean)
)

//指定要读取的topic的名称
val topics = Array(topic)

//获取消费kafka的数据流
val kafkaDstream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

//处理数据
//首先将kafkaDstream转换为rdd,然后就可以调用rdd中的foreachPartition了
kafkaDstream.foreachRDD(rdd=>{
//一次处理一个分区的数据
rdd.foreachPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(record=>{
//获取粉丝关注相关数据
val line = record.value()
//解析数据
val userFollowObj = JSON.parseObject(line)
//获取数据类型,关注 or 取消关注
val followType = userFollowObj.getString("desc")
//获取followeruid
val followeruid = userFollowObj.getString("followeruid")
//获取followuid
val followuid = userFollowObj.getString("followuid")

if("follow".equals(followType)){ // 顺序前后不要改
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit] (){
override def execute(tx: Transaction): Unit = {
try{
tx.run("merge (:User {uid:'"+followeruid+"'})")
tx.run("merge (:User {uid:'"+followuid+"'})")
tx.run("match(a:User {uid:'"+followeruid+"'}),(b:User {uid:'"+followuid+"'}) merge (a) -[:follow]-> (b)")
//提交事务
tx.commit()
}catch {
case ex: Exception => tx.rollback()
}
}
})
}else{
//取消关注
session.run("match (:User {uid:'"+followeruid+"'}) -[r:follow]-> (:User {uid:'"+followuid+"'}) delete r")
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
})
//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}
1
注意:建议这个项目中的所有依赖包全部在spark-submit脚本后面的–jars中指定(neo4j-java-driver、fastjson、spark-streaming-kafka这三个需要手动指定,其它的spark安装包已有),这样最终生成的任_务jar就比较小了,提交任务的时候速度会比较快。所以这里面的jar-with-dependencies插件就可以不使用了,因为我们打jar包的时候不需要把依赖打进去,这个时候也不需要在依赖中添加provided参数了。

image-20230426165401187

image-20230426170407568

1
2
3
为了方便的提交任务,我们再开发一个任务提交脚本,
在项目中创建一个bin目录,把脚本放到这个bin目录一样,这样便于管理维护
startRealTimeFollow.sh

startRealTimeFollow.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`

appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.RealTimeFollowScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/spark-streaming-kafka-0-10_2.11-2.4.3.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/kafka-clients-2.4.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${username} ${password}
1
2
3
4
5
6
7
8
9
jar包路径可以是本地,也可以是hdfs上,建议hdfs

在本地如何找这些jar包?在.m2(是maven本地目录)目录下去找,然后再上传

注意:
针对这个参数:yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"
使用公共的依赖包目录,使用起来方便,管理维护起来也方便,并且还可以提高任务执行效率,因为当我们向集群提交的时候,任务需要的依赖jar包是会自动上传到hdfs的一个临时目录的,如果我们提前把jar包上传到hdfs上面,就不会再重新上传了

针对–jars后面指定的依赖jar包,需要额外再指定kafka-clients-2.4.1.jar和reactive-streams-1.0.3.jar,一个是kafka的,一个是neo4j需要依赖的,否则提交任务到集群执行是会报错的。

image-20230426182327467

image-20230426184121107

image-20230426184238164

image-20230426184330270

1
重新生成一条实时关注数据,查看结果

image-20230426184425485

1
2
3
4
5
6
停止sparkStreaming任务

这是正常的一帆风顺的流程。

那我们如果是第一次这样做,肯定会遇到各种各样的问题,在这里来给大家复现一下。
按照正常的思路,这个项目依赖的jar包最开始我们肯定只会使用这三个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`

appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.RealTimeFollowScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/spark-streaming-kafka-0-10_2.11-2.4.3.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar \
/data/soft/video_recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${username} ${password}
1
sh -x startRealTimeFollow.sh
1
2
3
此时你会发现任务执行失败了
报错信息如下,在控制台就可以看到具体的报错信息:
这个报错信息表示是缺少kafka的一些依赖,org/apache/kafka/common/serialization/StringDeserializer,通过这里面的包名可以看出来,这个是kafka-clients的依赖,如果直接看不出来,那就到网上搜一下这个报错信息看看有没有收获java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
1
2
3
4
5
6
7
8
9
10
所以我们在脚本中再添加这个kafka-client的依赖

再提交

发现任务还是执行失败,在控制台可以看到报错信息如下

这里提示neo4j中的org.neo4j.driver.Config初始化失败,但是neo4j的依赖我们也添加进去了,为什么还会报这个错呢?

如果这里看不到有用的错误信息,我们可以尝试到YARN中查看
在YARN的8088界面中我们进入到spark的任务界面(注意:需要确保hadoop的日志聚合功能开启,以及Spark的historyServer进程也是开启的。)

image-20230426212208228

image-20230426212224386

image-20230426212235821

1
2
3
4
5
6
其实根源是在这,主要是缺少这个类org.reactivestreams.Publisher,最终导致的org.neo4j.driver.Config无法初始化。

那这个类org.reactivestreams.Publisher是从哪来的呢?
它是neo4j需要使用的一个依赖里面的类
到哪找呢?
查看neo4j-java-driver(子pom里ctrl+点击neo4j-java-driver)这个依赖的依赖,会发现它里面确实有一个依赖的名称是org.reactivestreams,所以说我们还需要把这个依赖的jar包找到引进去。
1
2
3
到此为止,把这个jar包再加进去就可以正常执行了,这就是我们排查问题的一个思路和流程,这个思路大家一定要学以致用。

这个任务开发到这就结束了

数据计算之每天定时更新主播等级(第三个任务)

1
主播等级数据来源于服务端数据库(定时增量导入到HDFS中)

image-20230426213050640

1
2
3
4
5
6
注意:表中有两个等级字段,一个是用户等级,一个是主播等级

在这我们需要使用主播等级
针对这份数据,最核心的两个字段是第2列和第4列
第2列是用户uid
第4列是主播等级anchor_level
1
2
3
4
这个任务需要做的是把每天主播等级发生了变化的数据更新到neo4j中,在neo4j中也维护一份主播的等级
创建一个子module:update_user_level
创建scala目录,添加scala2.11的sdk
引入依赖

所需依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>

UpdateUserLevelScala.scala

1
2
3
在update_user_level下面的scala里面创建包:com.imooc.spark
创建类:UpdateUserLevelScala
代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.imooc.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}

/**
* 任务3:
* 每天定时更新主播等级
* Created by xuwei
*/
object UpdateUserLevelScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateUserLevelScala"
var filePath = "hdfs://bigdata01:9000/data/cl_level_user/20260201"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
filePath = args(2)
boltUrl = args(3)
username = args(4)
password = args(5)
}

//获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)

//读取用户等级数据
val linesRDD = sc.textFile(filePath)

//校验数据准确性
val filterRDD = linesRDD.filter(line => {
val fields = line.split("\t")
//判断每一行的列数是否正确,以及这一行是不是表头
if (fields.length == 8 && !fields(0).equals("id")) {
true
} else {
false
}
})

//处理数据
filterRDD.foreachPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(line=>{
val fields = line.split("\t")
//添加等级
session.run("merge(u:User {uid:'"+fields(1).trim+"'}) set u.level = "+fields(3).trim) //.trim防止空格被引入
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
}
}

本地执行

1
2
在本地执行代码
效果如下:

image-20230426221422858

startUpdateUserLevel.sh

1
2
开发任务执行脚本
startUpdateUserLevel.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/cl_level_user/${dt}"

masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`


appName="UpdateUserLevelScala"`date +%s` // 加的这个是为了后面过滤任务列表,查看是否执行成功
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateUserLevelScala \
--jars ${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_user_level-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

image-20230426223703754

image-20230426223841949

image-20230426223922101

打包配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

子项目完整pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_user_level</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

image-20230426224642629

集群执行

1
sh -x startUpdateUserLevel.sh 20260201

image-20230426225404311

image-20230426225455351

数据计算之每天定时更新用户活跃时间(第四个任务)

1
2
数据来源于客户端上报,每天只要打开过APP就会上报数据
数据格式:

image-20230426225850610

1
2
之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateUserActiveDataV2,在代码中指定日期2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。

生成数据

1
2
执行代码:GenerateUserActiveDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/user_active/20260201/

image-20230426231654458

image-20230426232332517

1
2
3
4
5
这个任务需要做的是把每天主动活跃的用户更新到neo4j中,在neo4j中维护一份用户的最新活跃时间
创建子module项目:update_user_active
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
引入依赖

所需依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

UpdateUserActiveScala.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.imooc.spark

import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}

/**
* 任务4:
* 每天定时更新用户活跃时间
* Created by xuwei
*/
object UpdateUserActiveScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateUserActiveScala"
var filePath = "hdfs://bigdata01:9000/data/user_active/20260201"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
filePath = args(2)
boltUrl = args(3)
username = args(4)
password = args(5)
}

//获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)

//读取用户活跃数据
val linesRDD = sc.textFile(filePath)

//处理数据
linesRDD.foreachPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(line=>{
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val timeStamp = jsonObj.getString("UnixtimeStamp")
//添加用户活跃时间
session.run("merge(u:User {uid:'"+uid+"'}) set u.timestamp = "+timeStamp)
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
}
}

本地执行

image-20230426233615284

startUpdateUserActive.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/user_active/${dt}"

masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`


appName="UpdateUserActiveScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateUserActiveScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_user_active-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

打包配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

子项目完整pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_user_active</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

集群执行

1
sh -x startUpdateUserActive.sh 20260201

image-20230426234450357

image-20230426234521405

数据计算之每周一计算最近一个月主播视频评级-1(第五个任务)

1
2
视频数据来源于服务端,当主播开播结束后会产生一条视频数据
数据格式:

image-20230426234917792

1
2
3
4
5
之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateVideoInfoDataV2,在代码中指定日期2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。

执行代码:GenerateVideoInfoDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/video_info/20260201/

生成数据

image-20230426235956140

1
2
3
4
5
6
7
8
9
10
11
这个任务需要做的就是统计最近一个月内主播的视频评级信息
在这我们先初始化一天的数据即可,计算一天和计算一个月的数据,计算逻辑是一样的,只有spark任务的输入路径不一样
如果是一个月的数据,假设这一个月有30天,则需要把这30天对应的30个目录使用逗号分隔,拼接成一个字符串,作为Spark任务的输入即可。

为什么这个任务要每周计算一次,而不是每天计算一次呢?
因为很多主播不会每天都开播,所以我们每天都计算意义不大,均衡考虑之后按照每周计算一次这个频率。

创建子module项目:update_video_info
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
引入依赖

所需依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

UpdateVideoInfoScala.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.imooc.spark

import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory

/**
* 任务5:
* 每周一计算最近一个月主播视频评级
* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1
*
* 注意:在执行程序之前需要先把flag=1的重置为0
* Created by xuwei
*/
object UpdateVideoInfoScala {
val logger = LoggerFactory.getLogger("UpdateVideoInfoScala")

def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateVideoInfoScala"
var filePath = "hdfs://bigdata01:9000/data/video_info/20260201"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
filePath = args(2)
boltUrl = args(3)
username = args(4)
password = args(5)
}

//在Driver端执行此代码,将flag=1的重置为0
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
session.run("match(a:User) where a.flag =1 set a.flag = 0")
//关闭会话
session.close()
//关闭连接
driver.close()


//获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)

//读取视频评级数据
val linesRDD = sc.textFile(filePath)

//解析数据中的uid,rating,timestamp
val tup3RDD = linesRDD.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp")
(uid, rating, timestamp)
} catch {
case ex: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})

//过滤异常数据
val filterRDD = tup3RDD.filter(_._2 != "0")

//获取用户最近3场直播(视频)的评级信息
val top3RDD = filterRDD.groupBy(_._1).map(group => {
//获取最近3次开播的数据,使用制表符拼接成一个字符串
//uid,rating,timestamp \t uid,rating,timestamp \t uid,rating,timestamp
val top3 = group._2.toList.sortBy(_._3).reverse.take(3).mkString("\t")
(group._1, top3)
})

//过滤出来满足3场B+的数据
val top3BRDD = top3RDD.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
//3场B+,表示里面没有出现C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足3场B+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户
//注意:针对3场B+的数据还需要额外再限制一下主播等级,主播等级需要>=15,这样可以保证筛选出来的主播尽可能是一些优质主播
top3BRDD.foreachPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=15 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
})

//过滤出来满足2场A+的数据
val top2ARDD = top3RDD.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length >= 2) {
//2场A+,获取最近两场直播评级,里面不能出现B、C、D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足2场A+的数据更新到neo4j中,设置flag=1
//注意:针对2场A+的数据还需要额外限制一下主播等级,主播等级需要>=4 这样可以保证筛选出来的主播尽可能是一些优质主播
top2ARDD.foreachPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=4 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
})
}
}

本地执行

1
2
3
在本地执行代码
然后到neo4j的web界面查看结果,发现只有uid为1005的数据对应的flag不等于1(没有flag属性)
这样是正确的。

image-20230427104641625

image-20230427104824499

startUpdateVideoInfo.sh

1
2
3
下面开发任务执行脚本
注意:这个脚本中需要实现获取最近一个月的数据目录
startUpdateVideoInfo.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/bin/bash

# 获取最近一个月的文件目录
#filepath=""
#for((i=1;i<=30;i++))
#do
# filepath+="hdfs://bigdata01:9000/data/video_info/"`date -d "$i days ago" +"%Y%m%d"`, // 这里的,号很经典
#done


#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/video_info/${dt}"

masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`


appName="UpdateVideoInfoScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"

yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateVideoInfoScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_video_info-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

子项目完整依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_user_active</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

集群执行

1
sh -x startUpdateVideoInfo.sh 20260201

数据计算之每周一计算三度关系推荐列数据(第六个任务)

1
2
3
4
5
6
7
8
前面我们在neo4j中维护了粉丝和主播的一些信息,在这里我们就需要基于neo4j中的数据统计主播的三度关系推荐列表了

这个任务在这也是每周计算一次,我们测试过,每天都计算的话,最终的结果变化是不大的,所以就没必要每天计算了。

创建子module项目:get_recommend_list
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
引入依赖,这里面需要额外用到spark-sql和neo4j-spark-connector这两个依赖

生成数据

所需依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
</dependency>
1
注意:在使用spark读取neo4j中数据的时候,可以使用一个插件,在官网中可以找到

image-20230428134919089

image-20230428134931154

image-20230428135056752

1
2
这个版本是基于neo4j 4.0,我们现在使用的neo4j是3.5的,这种一般是向下兼容的,所以操作neo4j 3.5也是可以的,后面写的spark是2.4.5,这个也是可以的,我们使用的spark是2.4.3的,最后一位版本号不一致没问题。
目前最新版本是基于scala2.12版本编译的,我们在spark项目中使用的scala版本是2.11,所以使用2.4.5-M1这个版本。
1
注意:在使用这个依赖的时候,还需要配置它对应的repository,因为这个依赖没有在maven仓库中,把这些配置添加到父项目的pom.xml文件中

image-20230428135228605

image-20230428135241954

1
2
3
4
咱们前面使用的neo4j-java-driver相当于是使用原生代码操作neo4j,而现在使用neo4j-spark-connector相当于把neo4j封装到spark中了,使用起来比较方便。

在使用neo4j-spark-connector的时候,选择哪个版本呢?
点这里进去看一下

GetRecommendListScala.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.imooc.spark

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.neo4j.spark.Neo4j

import scala.collection.mutable.ArrayBuffer

/**
* 任务6:
* 每周一计算最近一周内主活主播的三度关系列表
* 注意:
* 1:待推荐主播最近一周内活跃过
* 2:待推荐主播等级>4
* 3:待推荐主播最近1个月视频评级满足3B+或2A+(flag=1)
* 4:待推荐主播的粉丝列表关注重合度>2
* Created by xuwei
*/
object GetRecommendListScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "GetRecommendListScala"
var boltUrl = "bolt://bigdata04:7687"
var username = "neo4j"
var password = "admin"
var timestamp = 0L //过滤最近一周内是否活跃过
var duplicateNum = 2 //粉丝列表关注重合度
var level = 4 //主播等级
var outputPath = "hdfs://bigdata01:9000/data/recommend_data/20260201"
if(args.length > 0){
masterUrl = args(0)
appName = args(1)
boltUrl = args(2)
username = args(3)
password = args(4)
timestamp = args(5).toLong
duplicateNum = args(6).toInt
level = args(7).toInt
outputPath = args(8)
}


//获取SparkContext
val conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
.set("spark.driver.allowMultipleContexts","true")//允许创建多个context
.set("spark.neo4j.url",boltUrl)//bolt的地址
.set("spark.neo4j.user",username)//neo4j用户名
.set("spark.neo4j.password",password)//neo4j密码
val sc = new SparkContext(conf)

//获取一周内主活的主播 并且主播等级大于4的数据
var params = Map[String,Long]()
params += ("timestamp"->timestamp)
params += ("level"->level)
val neo4j: Neo4j = Neo4j(sc).cypher("match (a:User) where a.timestamp >= {timestamp} and a.level >= {level} return a.uid").params(params)

//将从neo4j中查询出来的数据转换为rowRDD
//val rowRDD = neo4j.loadRowRdd
//repartition 这里的repartition是为了把数据分为7份,这样下面的mapPartitions在执行的时候就有7个线程
//这7个线程并行查询neo4j数据库
val rowRDD = neo4j.loadRowRdd.repartition(7)

//一次处理一批
//过滤出粉丝关注重合度>2的数据,并且对关注重合度倒序排列
//最终的数据格式是:主播id,待推荐的主播id
val mapRDD = rowRDD.mapPartitions(it => {
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))
//开启一个会话
val session = driver.session()
//保存计算出来的结果
val resultArr = ArrayBuffer[String]()
it.foreach(row => {
val uid = row.getString(0)
//计算一个用户的三度关系(主播的二度关系)
//注意:数据量大了之后,这个计算操作是非常耗时
//val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
//对b、c的主活时间进行过滤,以及对c的level和flag值进行过滤
val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User) " +
"where b.timestamp >= " + timestamp + " and c.timestamp >= " + timestamp + " and c.level >= " + level + " and c.flag = 1 " +
"return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
while (result.hasNext) {
val record = result.next()
val sum = record.get("sum").asInt()
if (sum > duplicateNum) {
resultArr += record.get("auid").asString() + "\t" + record.get("cuid").asString()
}
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
resultArr.iterator
}).persist(StorageLevel.MEMORY_AND_DISK) //把RDD数据缓存起来

//把数据转成tuple2的形式
val tup2RDD = mapRDD.map(line => {
val splits = line.split("\t")
(splits(0), splits(1))
})
//根据主播id进行分组,可以获取到这个主播的待推荐列表
val reduceRDD = tup2RDD.reduceByKey((v1, v2) => {
v1 + "," + v2
})

//最终把结果组装成这种形式
//1001 1002,1003,1004
reduceRDD.map(tup=>{
tup._1+"\t"+tup._2
}).repartition(1).saveAsTextFile(outputPath)
}

}

本地执行

1
先使用这一行代码,在计算三度关系数据的时候暂时先不进行条件过滤。

image-20230428153711075

image-20230428153735751

1
接着使用这一行代码,在计算三度关系数据的对数据进行条件过滤

image-20230428153929765

image-20230428154023306

1
到neo4j中验证一下,确实是正确的,因为1005的flag不为1,被过滤掉了,所以我在关注1000这个主播的时候平台只需要给我推荐主1004这个主播即可。
1
注意:这个代码在执行mapPartitions的时候,最好把rowRDD的分区重新设置一下,如果让程序自动分配的话可能不太合理,分多了分少了都不太好
1
2
3
4
5
由于我们在mapPartitions中需要操作neo4j,所以这个时候rowRDD分区的数量就可以等于(neo4j服务器的CPU数量-1),要给neo4j预留出来一个cpu来处理其它任务请求。
我们当时的服务器是8个CPU,给neo4j预留出来一个,剩下还有7个,所以说,neo4j此时可以对外提供的最大并发处理能力是7,那我们就把rowRDD设置为7个分区,就会有7个线程并行处理数据,它们可以并行操作neo4j,这样效率最高。
如果给rowRDD设置的分区太多,对应的就会有多个线程并行操作neo4j,会给neo4j造成很大的压力,相当于neo4j在满负荷的运行,这个时候我们另外一个实时维护neo4j中粉丝关注数据的程序执行起来就很慢了,以及其他人如果这个时候想查询neo4j,也会非常慢,所以这样就不太好了。
如果给rowRDD设置的分区太少,对应产生的执行线程就比较少,此时neo4j会比较空闲,没有多大压力,但是我们这个三度关系的任务执行就非常慢了。
综上所述,建议把rowRDD的分区数量设置为7,这样可以充分利用neo4j服务器的性能,也不至于把neo4j服务器拖垮。
image-20230428154738077
1
还有一点可以优化的,增加RDD持久化,把RDD数据缓存起来,这样可以避免个别task失败导致的数据重算,因为这个计算还是比较消耗时间的,所以说尽可能保证计算出来的数据不丢失。

image-20230428155333860

1
问题:大家有没有想过,我们是否可以直接在Neo4j(sc).cypher(…)中指定一条查询语句,直接把所有的三度关系全部查询出来?
1
这样理论上是可以的,但是在实际中,当neo4j中存储的节点数和关系数量达到千万级别之后,同时查询所有满足条件主播的三度关系推荐列表的时候会很慢,有时候会导致等了几十分钟也查询不出来数据,所以在这我们就把这个功能进行了拆解,先查看满足条件的主播uid列表,然后再一个一个计算这些主播的三度关系推荐列表,这样可以提高计算效率,并且不会出现查询不出来结果的情况。

startGetRecommendList.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi

masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`


appName="GetRecommendListScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
username="neo4j"
password="admin"
#获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" +%s`000
#粉丝列表关注重合度
duplicateNum=2
#主播等级
level=4

#输出结果数据路径
outputPath="hdfs://bigdata01:9000/data/recommend_data/${dt}"


yarnCommonLib="hdfs://bigdata01:9000/yarnCommonLib"

spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.GetRecommendListScala \
--jars ${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar,${yarnCommonLib}/neo4j-spark-connector-2.4.5-M1.jar \
/data/soft/video_recommend/jobs/get_recommend_list-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${boltUrl} ${username} ${password} ${timestamp} ${duplicateNum} ${level} ${outputPath}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

子项目完整依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>get_recommend_list</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

集群执行

1
sh -x startGetRecommendList.sh 20260201

image-20230428160537691

数据计算之三度关系数据导出到Mysql(第七个任务)

1
2
3
4
5
6
7
8
9
10
接下来我们需要使用Sqoop将HDFS中计算好的三度关系推荐列表数据导出到MySQL中
需要在mysql中创建一个表recommend_list
这个表有两列:
第一列为主播uid
第二列为待推荐主播uid

当用户关注某个主播的时候,会根据这个主播的uid到这个表里面进行查询,把待推荐主播uid获取到,在页面中进行展现,推荐给用户,这样就都可以实现三度关系推荐的效果了。

建表语句如下:
recommend_list.sql

recommend_list.sql

1
2
3
4
5
6
drop table if exists recommend_list;
create table recommend_list(
uid varchar(255) NOT NULL,
recommend_uids varchar(500) NOT NULL,
primary key (uid)
)

export_recommend_list.sh

1
2
3
4
接下来使用sqoop将hdfs中的输出导出到mysql中
在导出的时候实现插入和更新功能,如果uid对应的数据已存在,则更新,如果不存在则插入
开发一个脚本
export_recommend_list.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi

sqoop export \
--connect jdbc:mysql://192.168.182.1:3306/video?serverTimezone=UTC \
--username root \
--password admin \
--table recommend_list \
--export-dir hdfs://bigdata01:9000/data/recommend_data/${dt} \
--input-fields-terminated-by '\t' \
--update-key uid \
--update-mode allowinsert
1
sh -x exexport_recommend_list.sh 20260201

image-20230428175018471

1
其实在实际工作中我们需要做的到这就可以了,然后把这个数据库的名称、表名、表中的字段含义写一个文档同步给服务端即可,具体的数据交互是由服务端和客户端进行对接的。

数据展现

1
2
在后面的v2.0架构中,我们会开发一个接口,对外提供数据,因为直接把数据库暴露给其它用户不太安全,倒不是怕他们删库跑路,是担心他们误操作把某些数据删掉了。
等到我们在v2.0中开发了数据接口之后,我们再通过本地启动项目进行效果演示。

项目代码双语支持

1
2
3
咱们前面在开发具体的数据计算代码的时候,使用的都是scala代码,为了兼顾Java开发人员,针对数据处理中的功能代码在这我也提供了Java代码支持
在这里这些java代码我就不再手敲了,到时候直接把对应的java代码一起提交到git上面,大家如果有需要了可以去一下。
其实我是不建议在工作中使用java去开发spark和flink的,在这只是为了给大家多一个选择而已,根据行业经验而言,scala语言是开发spark和flink最好的选择。
1
2
3
注意:
在v1.0中,我们侧重于学习整个项目的业务、架构逻辑和实战代码开发。
针对项目调忧、项目数据规模、集群规模、Neo4j性能指标等内容,在后面的V2.0中给大家安排上。

本文标题:大数据开发工程师-第十八周-直播平台三度关系推荐v1-0-3

文章作者:TTYONG

发布时间:2023年04月24日 - 15:04

最后更新:2023年05月26日 - 18:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AB%E5%91%A8-%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0%E4%B8%89%E5%BA%A6%E5%85%B3%E7%B3%BB%E6%8E%A8%E8%8D%90v1-0-3.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%