大数据开发工程师-第十八周 直播平台三度关系推荐v2.0-2


第十八周 直播平台三度关系推荐v2.0-2

每周一计算最近一周内主活主播的三度关系列表(任务六)

1
2
3
4
5
使用Flink程序实现每周一计算最近一周内主活主播的三度关系列表
创建子module项目:get_recommend_list
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>get_recommend_list</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
1
2
3
在resources目录中添加log4j.properties配置文件

注意:此时需要通过flink读取neo4j中的数据,但是针对DataSet是不支持addSource方法的,但是它里面有一个createInput,可以接收一个自定义的InputFormat,所以我就需要定义一个Neo4jInputFormat了

Neo4jInputFormat.scala

1
2
3
4
创建类:Neo4jInputFormat
代码如下:

注意:此代码中的输入组件只能使用单并行度执行,如果使用多并行度查询可能会出现重复数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.imooc.flink

import org.apache.flink.api.common.io.statistics.BaseStatistics
import org.apache.flink.api.common.io.{DefaultInputSplitAssigner, NonParallelInput, RichInputFormat}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.io.{GenericInputSplit, InputSplit, InputSplitAssigner}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Result, Session}

/**
* 从Neo4j中查询满足条件的主播
* 一周内活跃过,并且主播等级大于4
* Created by xuwei
*/
class Neo4jInputFormat extends RichInputFormat[String,InputSplit] with NonParallelInput{ // 这里是多继承
//注意:with NonParallelInput 表示此组件不支持多并行度

//保存neo4j相关的配置参数
var param: Map[String,String] = Map()

var driver: Driver = _
var session: Session = _
var result: Result = _

/**
* 构造函数
* 接收neo4j相关的配置参数
* @param param
*/
def this(param: Map[String,String]){
this()
this.param = param
}

/**
* 配置此输入格式
* @param parameters
*/
override def configure(parameters: Configuration): Unit = {}

/**
* 获取输入数据的基本统计信息
* @param cachedStatistics
* @return
*/
override def getStatistics(cachedStatistics: BaseStatistics): BaseStatistics = {
cachedStatistics
}

/**
* 对输入数据切分split
* @param minNumSplits
* @return
*/
override def createInputSplits(minNumSplits: Int): Array[InputSplit] = {
Array(new GenericInputSplit(0,1))
}

/**
* 获取切分的split
* @param inputSplits
* @return
*/
override def getInputSplitAssigner(inputSplits: Array[InputSplit]): InputSplitAssigner = {
new DefaultInputSplitAssigner(inputSplits)
}

/**
* 初始化方法:只执行一次
* 获取neo4j连接,开启会话
*/
override def openInputFormat(): Unit = {
//初始化Neo4j连接
this.driver = GraphDatabase.driver(param("boltUrl"), AuthTokens.basic(param("userName"), param("passWord")))
//开启会话
this.session = driver.session()
}

/**
* 关闭Neo4j连接
*/
override def closeInputFormat(): Unit = {
if(driver!=null){
driver.close()
}
}

/**
* 此方法也是只执行一次
* @param split
*/
override def open(split: InputSplit): Unit = {
this.result = session.run("match (a:User) where a.timestamp >="+param("timestamp")+" and a.level >= "+param("level")+" return a.uid")
}

/**
* 如果数据读取完毕号以后,需要返回true
* @return
*/
override def reachedEnd(): Boolean = {
!result.hasNext
}

/**
* 读取结果数据,一次读取一条
* @param reuse
* @return
*/
override def nextRecord(reuse: String): String = {
val record = result.next()
val uid = record.get(0).asString()
uid
}

/**
* 关闭会话
*/
override def close(): Unit = {
if(session!=null){
session.close()
}
}
}

GetRecommendList.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.imooc.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/**
* 任务6:
* 每周一计算最近一周内主活主播的三度关系列表
* 注意:
* 1:待推荐主播最近一周内活跃过
* 2:待推荐主播等级>4
* 3:待推荐主播最近1个月视频评级满足3B+或2A+(flag=1)
* 4:待推荐主播的粉丝列表关注重合度>2
* Created by xuwei
*/
object GetRecommendListScala {
def main(args: Array[String]): Unit = {
var appName = "GetRecommendListScala"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
var timestamp = 0L //过滤最近一周内是否活跃过
var dumplicateNum = 2 //粉丝列表关注重合度
var level = 4 //主播等级
var outputPath = "hdfs://bigdata01:9000/data/recommend_data/20260201"
if(args.length>0){
appName = args(0)
boltUrl = args(1)
userName = args(2)
passWord = args(3)
timestamp = args(4).toLong
dumplicateNum = args(5).toInt
level = args(6).toInt
outputPath = args(7)
}

val env = ExecutionEnvironment.getExecutionEnvironment
//添加隐式转换代码
import org.apache.flink.api.scala._
val param = Map("boltUrl"->boltUrl,"userName"->userName,"passWord"->passWord,"timestamp"->timestamp.toString,"level"->level.toString)
//获取一周内主活的主播 并且主播等级大于4的数据
val uidSet = env.createInput(new Neo4jInputFormat(param))

//一次处理一批
//过滤出粉丝关注重合度>2的数据,并且对关注重合度倒序排序
//最终的数据格式是:主播id,待推荐的主播id
val mapSet = uidSet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
//保存计算出来的结果
val resultArr = ArrayBuffer[String]()
it.foreach(uid=>{
//计算一个用户的三度关系(主播的三度关系)
//注意:数据量打了之后,这个计算操作是非常耗时
val result = session.run("match (a:User {uid:'"+uid+"'}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")
//对b、c的主活时间进行过滤,以及对c的level和flag值进行过滤
/*val result = session.run("match (a:User {uid:'"+uid+"'}) <-[:follow]- (b:User) -[:follow]-> (c:User)" +
" where b.timestamp >= "+timestamp+" and c.timestamp >= "+timestamp+" and c.level >= "+level +" and c.flag =1 " +
" return a.uid as auid,c.uid as cuid,count(c.uid) as sum order by sum desc limit 30")*/
while(result.hasNext){
val record = result.next()
val sum = record.get("sum").asInt()
if(sum > dumplicateNum){
resultArr += record.get("auid").asString()+"\t"+record.get("cuid").asString()
}
}
})
//关闭会话
session.close()
//关闭连接
driver.close()
resultArr.iterator
})

//把数据转成tupl2的形式
val tup2Set = mapSet.map(line => {
val splits = line.split("\t")
(splits(0), splits(1))
})

//根据主播id进行分组,可以获取到这个主播的待推荐列表
val reduceSet = tup2Set.groupBy(_._1).reduceGroup(it => {
val list = it.toList
val tmpList = ListBuffer[String]()
for (l <- list) {
tmpList += l._2
}
//把结果组装成这种形式 1001 1002,1003,1004
(list.head._1, tmpList.toList.mkString(","))
})

//注意:writeAsCsv只能保存tuple类型的数据
//writerAsText可以支持任何类型,如果是对象,会调用对象的toString方法写入到文件中
reduceSet.writeAsCsv(outputPath,"\n","\t")

//执行任务
env.execute(appName)
}
}
1
2
3
其实我们也可以直接在这里将结果输入写入到redis中,不过为了整体看起来更加规范,在这就先把数据临时写到hdfs上面。

在本地执行代码,然后到hdfs上面确认结果

image-20230516145733981

打包

1
2
接下来对程序编译打包
在pom.xml中添加编译打包配置

startGetRecommendList.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi


masterUrl="yarn-cluster"
appName="GetRecommendListScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"
#获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" +%s`000
#粉丝列表关注重合度
dumplicateNum=2
#主播等级
level=4

#输出结果数据路径
outputPath="hdfs://bigdata01:9000/data/recommend_data/${dt}"


#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.GetRecommendListScala \
/data/soft/video_recommend_v2/jobs/get_recommend_list-1.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${boltUrl} ${userName} ${passWord} ${timestamp} ${dumplicateNum} ${level} ${outputPath}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

提交任务

1
sh -x startGetRecommendList.sh 20260201

image-20230516150429277

三度关系列表数据导出到Redis(任务七)

1
2
3
使用Flink程序实现将三度关系列表数据导出到Redis

注意:此任务每周执行一次,在任务6执行完毕以后执行这个。
1
2
3
4
创建子module项目:export_data
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>export_data</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
1
2
3
在resources目录中添加log4j.properties配置文件
创建类:ExportDataScala
代码如下:

ExportDataScala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.imooc.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import redis.clients.jedis.Jedis

/**
* 任务7:
* 将三度列表关系数据导出到Redis
* Created by xuwei
*/
object ExportDataScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/recommend_data/20260125"
var redisHost = "bigdata04"
var redisPort = 6379
if(args.length>0){
filePath = args(0)
redisHost = args(1)
redisPort = args(2).toInt
}

val env = ExecutionEnvironment.getExecutionEnvironment
//读取hdfs中的数据
val text = env.readTextFile(filePath)

//添加隐式转换代码
import org.apache.flink.api.scala._
//处理数据
text.mapPartition(it=>{
//获取jedis连接
val jedis = new Jedis(redisHost, redisPort)
//开启管道(提高性能,不开启也没事)
val pipeline = jedis.pipelined()
it.foreach(line=>{
val fields = line.split("\t")
//获取uid
val uid = fields(0)
//获取待推荐主播列表
val recommend_uids = fields(1).split(",")

//注意:在这里给key起一个有意义的名字,l表示list类型、rec是recommend的简写(简写是因为key要放内存)
val key = "l_rec_"+uid

//先删除(保证每周更新一次),pipeline中的删除操作在scala语言下使用有问题
jedis.del(key)
for(r_uid <- recommend_uids){
pipeline.rpush(key,r_uid)
//给key设置一个有效时间,30天,如果30天数据没有更新,则删除此key
pipeline.expire(key,30*24*60*60)
}
})
//提交管道中的命令
pipeline.sync()
//关闭jedis连接
jedis.close()
""
}).print()
}
}
1
注意:在执行代码之前,需要先把redis服务启动起来

本地执行

1
在本地执行代码,到redis中验证效果。

image-20230516164750844

image-20230516163851685

image-20230516164936168

打包

1
2
接下来对程序编译打包
在pom.xml中添加编译打包配置

startExportData.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/bin/bash
#默认获取上周一的时间
dt=`date -d "7 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=`date -d "7 days ago $1" +"%Y%m%d"`
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/recommend_data/${dt}"

masterUrl="yarn-cluster"
appName="ExportDataScala"`date +%s`
redisHost="bigdata04"
redisPort=6379


#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.ExportDataScala \
/data/soft/video_recommend_v2/jobs/export_data-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${redisHost} ${redisPort}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

提交任务

1
2
3
向集群提交任务,先把redis中之前生成的数据删一下

sh -x startExportData.sh 20260201
1
任务成功执行,验证redis中的结果也是正确的

数据接口定义及开发(java web了解即可)

1
2
前面我们把结果数据计算好了,那接下来我们需要开发数据接口,对外提供数据。
首先定义接口文档

数据接口文档定义

1
为了方便跨部门数据使用,我们需要定义接口文档,便于其他部门的同事使用我们的数据

image-20230516170118438

数据接口代码开发

1
2
3
4
开发数据接口需要用到javaweb项目,在这给大家演示一下如何基于spring-boot搭建一个javaweb项目
创建子module项目:data_server
在pom.xml中添加依赖
首先添加spring-boot的依赖,还有fastjson依赖,因为我们后面在传输数据的时候需要使用json格式

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>data_server</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.1.RELEASE</version>
<configuration>
<mainClass>com.imooc.Application</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
1
2
3
4
在项目的resource目录中添加这两个文件

application.properties
logback.xml
1
2
3
4
创建包:com.imooc
然后把下面这几个文件夹及文件拷贝到com.imooc包里面
controller
Application.java

image-20230516171622873

1
2
3
直接在Application类中右键执行,就可以启动这个javaweb项目,项目内部已经集成了tomcta容器,监听的端口是8085
验证项目是否可以正常访问。
在浏览器中访问

image-20230516172612542

1
2
能看到结果数据说明此项目的基础框架是ok的,接下来我们就来开发一个接口
由于在这我们需要操作redis,所以需要到pom.xml中增加jedis的依赖,以及把我们之前开发的RedisUtils工具类也拷贝过来
1
2
3
4
在com.imooc下创建utils目录,把RedisUtils拷贝到里面

接下来到DataController类中增加一个方法:getRecommendList
代码如下:

DataController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.imooc.controller;

import com.alibaba.fastjson.JSONObject;
import com.imooc.utils.RedisUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;

import java.util.List;

/**
* 数据接口V1.0
* Created by xuwei
*/
@RestController//控制器类
@RequestMapping("/v1")//映射路径
public class DataController {
private static final Logger logger = LoggerFactory.getLogger(DataController.class);
/**
* 测试接口
* @param name
* @return
*/
@RequestMapping(value="/t1",method = RequestMethod.GET)
public String test(@RequestParam("name") String name) {

return "hello,"+name;
}

/**
* 根据主播uid查询三度关系推荐列表数据
*
* 返回数据格式:
* {"flag":"success/error","msg":"错误信息","rec_uids":["1005","1004"]}
* @param uid
* @return
*/
@RequestMapping(value="/get_recommend_list",method = RequestMethod.GET)
public JSONObject getRecommendList(@RequestParam("uid") String uid) {
JSONObject resobj = new JSONObject();
String flag = "success";
String msg = "ok";
try{
Jedis jedis = RedisUtils.getJedis();
//获取待推荐列表数据
List<String> uidList = jedis.lrange("l_rec_" + uid, 0, -1);
String[] uidArr = uidList.toArray(new String[0]);
resobj.put("rec_uids",uidArr);
}catch (Exception e){
flag = "error";
msg = e.getMessage();
logger.error(msg);
}
resobj.put("flag",flag);
resobj.put("msg",msg);

return resobj;
}
}

Application.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.imooc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Spring boot 入口启动程序
* Created by xuwei
*/
@SpringBootApplication //定义springboot入口程序
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
1
2
重新启动data_server项目
然后在浏览器中访问刚才开发的接口,能看到正常输出结果则说明此接口是正常的。

image-20230516173008401

打包

1
nohup java -jar data_server-1.0-SNAPSHOT.jar &
1
2
3
这个命令相当于模拟浏览器的请求
curl -XGET 'http://bigdata04:8085/v1/get_recommend_list?uid=1000'
{"msg":"ok","flag":"success","rec_uids":["1005","1004"]}

image-20230516175456652

数据展示

1
略略略

项目扩展优化

如何保证在Neo4j中维护平台全量粉丝关注数据

1
2
3
4
5
针对粉丝关注数据我们有两份
第一份是历史粉丝关注数据
第二份是实时粉丝关注数据

如何通过这两份数据实现维护平台全量粉丝关注数据呢?
1
2
3
4
5
6
7
8
9
10
11
背景是这样的
历史粉丝数据是由服务端每天晚上0点的时候定时同步到mysql数据库中的,因为之前平台是把粉丝的关注数据,存储到了redis中,每天晚上定时向mysql中同步一次。
实时粉丝数据在准备做这个项目之前通过日志采集工具把这些数据采集到kafka里面了

基于此,假设我们是在2026年2月1日那天上午10点开始将mysql中的历史数据导出来,然后批量导入到neo4j中,mysql中的粉丝数据其实是截止到2026年2月1日0点的。
这个导入过程当时耗时将近2天。
也就是在2026年2月3日上午10点左右导入完毕的,此时neo4j中的粉丝关注数据是截止到2026年2月1日0点的。

接下来我们需要通过kafka来将这两天内的粉丝关注数据读取出来,补充到neo4j中,如何实现呢?
因为我们的kafka当时是保存3天的数据,所以说这里面保存的还有2026-01-31 10点左右开始的数据,所以说当时我们开发好SparkStreaming程序之后,使用一个新的消费者groupid,然后将auto.offset.reset设置为earlist,读取最早的数据,这样就可以将这个topic中前3天的数据都读出来,然后在neo4j中进行维护,这样其实会重复执行2026-01-31 10点到2月1 日0点之间的数据,但是对最终的结果是没有影响的。
这样就可以实现在neo4j中全量维护粉丝关注数据了。

image-20230516210302781

如何解决数据乱序导致的粉丝关注关系不准确

1
2
通过在sparkStreaming内部对读取到的一小批数据基于时间进行排序,按照时间顺序执行粉丝关注相关操作,这样可以从一定程度上解决数据乱序的问题
在v2.0中,我们使用了Flink计算引擎,此时可以使用watermark+eventtime来解决数据乱序的问题。

如何优化三度关系推荐列表数据计算程序

1
2
3
4
5
6
针对三度关系推荐列表数据计算程序:GetRecommendListScala
这个任务在执行的时候需要执行20个小时左右,因为这里面会先查询出来满足条件的主播,然后挨个计算这些主播的三度关系数据,这里面需要和neo4j进行交互,主要慢在了neo4j这里,因为三度关系查询是比较复杂的,所以会比较耗时。
这个任务在执行的时候我们会发现它有时候无缘无故的提示task丢失,进而导致任务失败,还得重新计算,代价太大,所以这样不太靠谱。
后来发现是由于spark离线任务执行时间过长的时候会出现这种task丢失的问题。
所以后来我们对这个程序又做了优化。
针对第一步计算出来的主播列表,分成20份保存到hdfs上面

项目数据规模

集群资源规模(HDP集群)

集群数据规模

Neo4j性能指标

Neo4j核心参数修改


本文标题:大数据开发工程师-第十八周 直播平台三度关系推荐v2.0-2

文章作者:TTYONG

发布时间:2023年05月03日 - 17:05

最后更新:2023年05月20日 - 20:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AB%E5%91%A8-%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0%E4%B8%89%E5%BA%A6%E5%85%B3%E7%B3%BB%E6%8E%A8%E8%8D%90v2-0-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%