大数据开发工程师-第十八周 直播平台三度关系推荐v2.0-1


第十八周 直播平台三度关系推荐v2.0-1

V1.0架构存在的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
V1.0这个架构里面其实存在三个主要的问题

SparkStreaming程序的实时性不够
其实说实话,针对目前的粉丝实时关注数据,使用SparkStreaming程序来维护问题也不大,但是我们程序猿是要有追求的,既然有更好的方案,那我们肯定不能使用差的,所以这块我们我们需要使用Flink来实现,它可以提供真正意义上的实时。

三度关系推荐数据适合存储在缓存系统中(Redis)
咱们前面把最终计算好的三度关系数据保存在了MySQL中。
其实这种数据是比较适合存储到一些基于内存的缓存系统中的,对查询的性能要求比较高,并且这些数据也是有时效性的,需要定时更新和删除老数据,所以说此时,使用redis是比较合适的,redis的查询性能比较高,并且redis中可以给key设置一个生存时间,可以实现定时删除过期数据的效果。
咱们这份数据是有2个字段,第一列是主播uid,第二列是待推荐的主播uid
存储到redis里面的话value使用list类型即可,在list类型里面存储待推荐的主播列表

为了规范数据使用,建议开发数据接口
咱们前面把存储系统直接暴露给其他业务部门,是不太安全的,并且他们使用起来也不方便,在实际工作中,各个业务部门之间进行数据交互,正规流程都是提供接口。
还有一个原因是这样的,我们在开发一个功能的时候,假设需要前端和后端同时开发,这个时候我们就需要提前把数据接口定义好,先提供假数据,这样前端和后端可以同时进行开发,前端在开发页面功能的时候就可以使用我们提供的数据接口了,当我们把后端功能搞定以后,修改数据接口的底层逻辑代码,接入真实的数据即可,这个时候对前端而言是没有任何影响的,就算后期我们修改表结构了,对前端也没有什么影响,只要接口没有变就行,这样也可以实现前端和后端的解耦。

image-20230502090405428

技术选型

1
2
接下来再来回顾一下技术选型,看看在V2.0中有哪些变化
还是这4大块

image-20230502090508445

1
2
3
4
5
6
7
在数据采集这块,去掉了Sqoop,因为在这里我们需要将三度关系数据导出到redis中,sqoop是不支持的,所以我们需要开发flink程序实现数据导出功能。

在数据存储这块,我们把MySQL改为了Redis

在数据计算这块,我们把Spark计算引擎改为了Flink,在V2.0中,我们将之前使用Spark开发的代码都使用Flink重新实现一遍。

数据展现模块没有变化。

整体架构设计

image-20230502090921788

1
2
3
4
5
这是最新的架构图,在这里面,替换了Spark计算引擎和MySQL数据库,引入了Flink和Redis
以及在这里引入了数据接口模块,通过接口对外提供数据。
其它的地方没有变化。

注意:其实针对离线计算使用Spark或者Flink没有多大区别,不过我们还是希望一个项目中的计算框架相对来说是统一的,这样好管理,也好维护,所以在V2.0架构中,不管是离线计算还是实时计算,都使用Flink实现。

数据采集模块开发

1
数据采集模块没有变化,所以在这就不再分析了。

image-20230502091109759

数据计算核心指标详细分析

image-20230502091324304

1
2
这里面的数据源和计算指标都没有变化
一共还是这7个步骤

历史粉丝关注数据初始化(任务一)

1
2
3
第一步:历史粉丝关注数据初始化
这块流程是没有变化的,使用load csv将我们之前导出的历史粉丝关注数据进行初始化即可。
把neo4j中之前的数据清空一下,直接删除neo4j下面的data目录即可,然后启动neo4j
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
通过浏览器访问neo4j,重新设置密码

然后我们使用neo4j的shell命令行执行下面命令。

连接neo4j
bin/cypher-shell -a bolt://bigdata04:7687 -u neo4j -p admin

建立索引
CREATE CONSTRAINT ON (user:User) ASSERT user.uid IS UNIQUE;

批量导入数据
USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///follower_00.log' AS line FIELDTERMINATOR '\t'
MERGE (viewer:User { uid: toString(line.fuid)})
MERGE (anchor:User { uid: toString(line.uid)})
MERGE (viewer)-[:follow]->(anchor);

image-20230502091913756

实时维护粉丝关注数据(任务二)

image-20230502092054017

1
2
3
4
5
6
7
8
使用Flink程序实时维护Neo4j中粉丝关注数据
先创建maven项目db_video_recommend_v2
在pom.xml中添加项目需要用到的所有依赖

再创建子module项目:real_time_follow
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

父项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>db_video_recommend_v2</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>real_time_follow</module>
<module>update_user_level</module>
<module>update_user_active</module>
<module>update_video_info</module>
<module>get_recommend_list</module>
<module>export_data</module>
<module>data_server</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!-- flink相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>
<!-- neo4j相关依赖 -->
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>4.1.1</version>
</dependency>
<!-- jedis依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!--spring-boot依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>

</project>

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>real_time_follow</artifactId>
<dependencies>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>

<!-- flink的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
</dependency>

<!-- neo4j的依赖 -->
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>

<!-- fastjson的依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

RealTimeFollowScala.scala

1
2
3
在resources目录中添加log4j.properties配置文件
创建类:RealTimeFollowScala
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.imooc.flink

import java.util.Properties

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
* 任务2:
* 实时维护粉丝关注数据
* Created by xuwei
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var appName = "RealTimeFollowScala"
var kafkaBrokers = "bigdata01:9092,bigdata02:9092,bigdata03:9092"
var groupId = "con_f_1"
var topic = "user_follow"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length>0){
appName = args(0)
kafkaBrokers = args(1)
groupId = args(2)
topic = args(3)
boltUrl = args(4)
userName = args(5)
passWord = args(6)
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

//指定FlinkKafkaConsumer相关配置
val prop = new Properties()
prop.setProperty("bootstrap.servers",kafkaBrokers)
prop.setProperty("group.id",groupId)
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop)
//kafka consumer的消费策略设置
kafkaConsumer.setStartFromGroupOffsets()

//指定kafka作为source
import org.apache.flink.api.scala._
val text = env.addSource(kafkaConsumer)

//解析json数据中的核心字段
val tupStream = text.map(line => {
val jsonObj = JSON.parseObject(line)
val desc = jsonObj.getString("desc")
val followerUid = jsonObj.getString("followeruid")
val followUid = jsonObj.getString("followuid")
(desc, followerUid, followUid)
})

//使用Neo4jSink维护粉丝关注数据
val param = Map("boltUrl"->boltUrl,"userName"->userName,"passWord"->passWord)
tupStream.addSink(new Neo4jSink(param))

env.execute(appName)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
注意:由于flink中的实时计算是来一条数据计算一次,在StreamAPI中没有mapPartition方法,不支持一批一批的处理,如果每处理一条数据就获取一次Neo4j数据库连接,这样效率就太差了,所以我们需要实现一个自定义的sink组件,在sink组件内部有一个初始化函数可以获取一次连接,多次使用,这样就不需要频繁创建neo4j数据库连接了。

实现自定义的sink需要实现SinkFunction接口或者继承RichSinkFunction
具体实现逻辑可以参考已有connector中针对sink组件的实现
例如:RedisSink
源码在这里:
https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

这里面一共有三个主要的函数:
1:open,是一个初始化方法,在Sink组件初始化的时候执行一次,适合在里面初始化一些资源连接
2:invoke,会被频繁调用,sink接收到一条数据这个方法就会执行一次,具体的业务逻辑在这里实现
3:close,当任务停止的时候,会先调用sink组件中的close方法,适合在里面做一些关闭资源的操作

Neo4jSink.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.imooc.flink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Transaction, TransactionWork}

/**
* 维护粉丝数据在Neo4j中的关注关系
* Created by xuwei
*/
class Neo4jSink extends RichSinkFunction[Tuple3[String,String,String]]{
//保存neo4j相关的配置参数
var param: Map[String,String] = Map()

var driver: Driver = _

/**
* 构造函数
* 接收neo4j相关的配置参数
* @param param
*/
def this(param: Map[String,String]){
this()
this.param = param
}
/**
* 初始化方法,只执行一次
* 适合初始化资源连接
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
this.driver = GraphDatabase.driver(param("boltUrl"), AuthTokens.basic(param("userName"), param("passWord")))
}

/**
* 核心代码,来一条数据,此方法会执行一次
* @param value
* @param context
*/
override def invoke(value: (String, String, String), context: SinkFunction.Context[_]): Unit = {
//开启会话
val session = driver.session()
val followType = value._1
val followerUid = value._2
val followUid = value._3

if("follow".equals(followType)){
//添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit](){
override def execute(tx: Transaction): Unit = {
try{
tx.run("merge (:User {uid:'"+followerUid+"'})")
tx.run("merge (:User {uid:'"+followUid+"'})")
tx.run("match (a:User {uid:'"+followerUid+"'}),(b:User {uid:'"+followUid+"'}) merge (a) -[:follow]-> (b)")
tx.commit()
}catch {
case ex: Exception => tx.rollback()
}
}
})
}else{
//取消关注
session.run("match (:User {uid:'"+followerUid+"'}) -[r:follow]-> (:User {uid:'"+followUid+"'}) delete r")
}
//关闭会话
session.close()
}

/**
* 任务停止的时候会先调用此方法
* 适合关闭资源连接
*/
override def close(): Unit = {
//关闭连接
if(driver!=null){
driver.close()
}
}
}

本地执行

1
2
3
4
5
注意:需要确保zookeeper、kafka服务是正常运行的。

接下来需要产生测试数据,我们可以继续使用之前generate_data项目中的GenerateRealTimeFollowData产生数据,这种流程我们前面在v1.0中已经使用过了。
下面给大家演示一种方便测试的方法
其实我们可以通过kafka的基于console的生产者直接向user_follow这个topic生产数据
1
2
3
4
关注数据
{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"follow"}
取消关注数据
{"followeruid":"2004","followuid":"2008","timestamp":1598771070069,"type":"user_follow","desc":"unfollow"}

image-20230502121745119

image-20230502121838255

1
2
3
4
5
6
7
8
9
到neo4j中确认效果发现确实新增了一个关注关系
再模拟产生一条粉丝取消关注的数据

到neo4j中确认效果发现刚才新增的关注关系没有了。
这样就说明我们自己定义的Neo4jSink是可以正常工作的。

注意:在实际工作中,有时候为了方便测试代码是否可以正常运行,很多时候也会采用这种基于控制台的生产者直接模拟产生数据,这样不会经过中间商,没有差价!
如果使用整个数据采集全链路流程的话,可能会由于中间某个环节出问题导致的最终看不到效果,此时我们还得排查到底是哪里出了问题,这样就乱套了,本来是要验证代码逻辑的,结果又要去排查其它地方的问题了。
所以说针对流程比较复杂的,我们在测试的时候一块一块进行测试,先验证代码逻辑没问题,最后再跑一个全流程确认一下最终效果。

集群执行

1
2
接下来我们希望把代码提交到集群上运行
需要先调整代码,把参数提取出来
1
2
3
4
5
对项目打jar包
在pom.xml中添加编译打包配置
这里面的scala版本指定的是2.12版本
注意:flink官方建议把所有依赖都打进一个jar包,所以我们在这就把依赖打进一个jar包里面。
在flink1.11的时候新增了一个特性,可以支持动态指定依赖的jar包,但是我测试了还是有bug,所以在这我们就只能把依赖都打进jar包里面,其实我内心是拒绝的。
1
注意:针对log4j,flink相关的依赖在打包的时候不需要打进去,所以需要添加provided属性
1
2
此时我们就需要使用这个带有jar-with-dependencies的jar包了
real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar

startRealTimeFollow.sh

1
2
开发任务提交脚本
startRealTimeFollow.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash
masterUrl="yarn-cluster"
appName="RealTimeFollowScala"
kafkaBrokers="bigdata01:9092,bigdata02:9092,bigdata03:9092"
groupId="con_f_1"
topic="user_follow"
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.RealTimeFollowScala \
/data/soft/video_recommend_v2/jobs/real_time_follow-1.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${kafkaBrokers} ${groupId} ${topic} ${boltUrl} ${userName} ${passWord}
1
注意:在执行之前需要配置flink的环境变量,FLINK_HOME

image-20230502182120542

1
向集群提交任务

image-20230502182148965

1
2
3
通过kafka的console控制台生产者,模拟产生数据,到neo4j中确认效果,发现是没有问题的。

注意:此时是存在数据乱序的问题的,前面在讲Flink的时候我们详细讲解过Flink中的乱序处理方案,在这里给大家留一个作业,对这个代码进行改造,解决数据乱序问题。

每天定时更新主播等级(任务三)

1
2
3
4
5
使用Flink程序实现每天定时更新主播等级
创建子module项目:update_user_level
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_user_level</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
1
在resources目录中添加log4j.properties配置文件

UpdateUserLevelScala.scala

1
2
创建类:UpdateUserLevelScala
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.imooc.flink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}

/**
* 任务3:
* 每天定时更新主播等级
* Created by xuwei
*/
object UpdateUserLevelScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/cl_level_user/20260201"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length>0){
filePath = args(0)
boltUrl = args(1)
userName = args(2)
passWord = args(3)
}

val env = ExecutionEnvironment.getExecutionEnvironment
//读取hdfs中的数据
val text = env.readTextFile(filePath)

//校验数据准确性
val filterSet = text.filter(line => {
val fields = line.split("\t")
if (fields.length == 8 && !fields(0).equals("id")) {
true
} else {
false
}
})

//添加隐式转换代码
import org.apache.flink.api.scala._

//处理数据
filterSet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(line=>{
val fields = line.split("\t")
//添加等级
session.run("merge (u:User {uid:'"+fields(1).trim+"'}) set u.level = "+fields(3).trim)
})
//关闭会话
session.close()
//关闭连接
driver.close()
"" // spark的foreachpartition不需要返回数据,flink dataset的mappartition需要返回数据
}).print()
}
}

本地执行

1
2
3
使用之前生成的这份数据
hdfs://bigdata01:9000/data/cl_level_user/20260201
在本地执行代码,到neo4j中确认节点中是否新增了level属性,如果有,就说明程序执行成功了。

集群执行

1
2
接下来对程序编译打包
在pom.xml中添加编译打包配置

startUpdateUserLevel.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/cl_level_user/${dt}"

masterUrl="yarn-cluster"
appName="UpdateUserLevelScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.UpdateUserLevelScala \
/data/soft/video_recommend_v2/jobs/update_user_level-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${boltUrl} ${userName} ${passWord}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

每天定时更新用户活跃时间(任务四)

image-20230503152132924

1
2
3
4
5
使用Flink程序实现每天定时更新用户活跃时间
创建子module项目:update_user_active
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_user_active</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

UpdateUserActiveScala.scala

1
2
3
在resources目录中添加log4j.properties配置文件
创建类:UpdateUserActiveScala
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.imooc.flink

import com.alibaba.fastjson.JSON
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}

/**
* 任务4:
* 每天定时更新用户活跃时间
* Created by xuwei
*/
object UpdateUserActiveScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/user_active/20260201"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length>0){
filePath = args(0)
boltUrl = args(1)
userName = args(2)
passWord = args(3)
}

val env = ExecutionEnvironment.getExecutionEnvironment
//读取hdfs中的数据
val text = env.readTextFile(filePath)
//添加隐式转换代码
import org.apache.flink.api.scala._

//处理数据
text.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(line=>{
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val timeStamp = jsonObj.getString("UnixtimeStamp")
//添加用户活跃时间
session.run("merge (u:User {uid:'"+uid+"'}) set u.timestamp = "+timeStamp)
})
//关闭会话
session.close()
//关闭连接
driver.close()
"" // 和前面一个一样的道理
}).print() // 为了任务能够执行
}
}

本地执行

1
2
3
使用之前生成的这份数据
hdfs://bigdata01:9000/data/user_active/20260201
在本地执行代码,到neo4j中确认节点中是否新增了timestamp属性,如果有,就说明程序执行成功了。

集群执行

1
2
接下来对程序编译打包
在pom.xml中添加编译打包配置

startUpdateUserActive.sh

1
2
开发任务脚本
startUpdateUserActive.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/bin/bash
#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/user_active/${dt}"

masterUrl="yarn-cluster"
appName="UpdateUserActiveScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.UpdateUserActiveScala \
/data/soft/video_recommend_v2/jobs/update_user_active-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${boltUrl} ${userName} ${passWord}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi

每周一计算最近一个月主播视频评级(任务五)

image-20230503153659474

1
2
3
4
5
使用Flink程序实现每周一计算最近一个月主播视频评级
创建子module项目:update_video_info
在项目中创建scala目录,引入scala2.12版本的SDK
创建package:com.imooc.flink
在pom.xml中添加依赖

子项目pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>db_video_recommend_v2</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>update_video_info</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

UpdateVideoInfoScala.scala

1
2
3
在resources目录中添加log4j.properties配置文件
创建类:UpdateVideoInfoScala
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.imooc.flink

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory

/**
* 任务5:
* 每周一计算最近一个月主播视频评级
* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1
*
* 注意:在执行程序之前,需要先把flag=1的重置为0
* Created by xuwei
*/
object UpdateVideoInfoScala {
val logger = LoggerFactory.getLogger("UpdateVideoInfoScala")
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/video_info/20260201"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length>0){
filePath = args(0)
boltUrl = args(1)
userName = args(2)
passWord = args(3)
}

//在Driver端执行此代码,将flag=1的值重置为0
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
session.run("match (a:User) where a.flag=1 set a.flag = 0")

//关闭会话
session.close()
//关闭连接
driver.close()

val env = ExecutionEnvironment.getExecutionEnvironment
//读取hdfs中的数据
val text = env.readTextFile(filePath)

//添加隐式转换代码
import org.apache.flink.api.scala._

//解析数据中的uid、rating、timestamp
val tup3Set = text.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp") //这里要显示指定类型,不然会报错
(uid, rating, timestamp)
} catch {
case ex: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})

//过滤异常数据
val filterSet = tup3Set.filter(_._2 != "0")

//获取用户最近3场直播(视频)的评级信息
val top3Set = filterSet.groupBy(0)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(it => {
val list = it.toList
//(2002,A,1769913940002) (2002,A,1769913940001) (2002,A,1769913940000)
//uid,rating,timestamp \t uid,rating,timestamp \t uid,rating,timestamp
val top3 = list.take(3).mkString("\t")
//(2002,(2002,A,1769913940002) (2002,A,1769913940001) (2002,A,1769913940000))
(list.head._1, top3)
})

//过滤出来满足3场B+的数据
val top3BSet = top3Set.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
//3场B+,表示里面没有出现C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足3场B+的数据更新到neo4j中,设置flag=1
top3BSet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=15 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
""
}).print()


//过滤出来满足2场A+的数据
val = top3Set.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length >= 2) {
//2场A+,获取最近两场直播评级,里面不能出现B、C、D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足2场A+的数据更新到neo4j中,设置flag=1
top2ASet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=4 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
"" // 这里的道理和前面一样
}).print()
}
}

本地执行

1
2
3
使用之前生成的这份数据
hdfs://bigdata01:9000/data/video_info/20260201
在本地执行代码,到neo4j中确认节点中是否有flag属性的值。

异常

image-20230503161453968

1
代码中对uid进行分区那里没有显示指定类型,会报错。也是常见错误

image-20230503161026920

1
上面代码mappartion中的数据库链接部分,没有创建session,但并没有报错,用的是外部driver端的session;数据库链接并不支持序列化,不支持从driver到节点的传输。这种异常很常见。

多个输入目录问题

1
注意:flink目前不支持直接读取多个hdfs目录,在spark中,我们可以将多个hdfs目录使用逗号拼接成一个输入路径,flink目前不支持这种用法。

UpdateVideoInfoMoreFileScala.scala

1
2
3
4
5
那如何实现读取最近一个月的数据呢?
我们可以使用flink中的union算子间接实现读取多个hdfs目录的效果
复制一份代码,改名字为:UpdateVideoInfoMoreFileScala

代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.imooc.flink

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory

/**
* 任务5:
* 每周一计算最近一个月主播视频评级
* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1
*
* 注意:在执行程序之前,需要先把flag=1的重置为0
* Created by xuwei
*/
object UpdateVideoInfoMoreFileScala {
val logger = LoggerFactory.getLogger("UpdateVideoInfoScala")
def main(args: Array[String]): Unit = {
var filePath = "hdfs://bigdata01:9000/data/video_info/20260201,hdfs://bigdata01:9000/data/video_info/20260217"
var boltUrl = "bolt://bigdata04:7687"
var userName = "neo4j"
var passWord = "admin"
if(args.length>0){
filePath = args(0)
boltUrl = args(1)
userName = args(2)
passWord = args(3)
}

/*//在Driver端执行此代码,将flag=1的值重置为0
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
session.run("match (a:User) where a.flag=1 set a.flag = 0")

//关闭会话
session.close()
//关闭连接
driver.close()*/

val env = ExecutionEnvironment.getExecutionEnvironment
//添加隐式转换代码
import org.apache.flink.api.scala._

//使用union实现读取多个hdfs目录中的数据
val files = filePath.split(",")
var allText: DataSet[String] = env.fromElements("123")
for(file <- files){
allText = allText.union(env.readTextFile(file)) // 这里要赋值才符合逻辑
}

println("原始数据条数:"+allText.count())

//解析数据中的uid、rating、timestamp
val tup3Set = allText.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp")
(uid, rating, timestamp)
} catch {
case ex: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})

//过滤异常数据
val filterSet = tup3Set.filter(_._2 != "0")
println("过滤后的数据:"+filterSet.count())

//获取用户最近3场直播(视频)的评级信息
val top3Set = filterSet.groupBy(0)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(it => {
val list = it.toList
//(2002,A,1769913940002) (2002,A,1769913940001) (2002,A,1769913940000)
//uid,rating,timestamp \t uid,rating,timestamp \t uid,rating,timestamp
val top3 = list.take(3).mkString("\t")
//(2002,(2002,A,1769913940002) (2002,A,1769913940001) (2002,A,1769913940000))
(list.head._1, top3)
})

//过滤出来满足3场B+的数据
val top3BSet = top3Set.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
//3场B+,表示里面没有出现C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足3场B+的数据更新到neo4j中,设置flag=1
top3BSet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=15 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
""
}).print()


//过滤出来满足2场A+的数据
val top2ASet = top3Set.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length >= 2) {
//2场A+,获取最近两场直播评级,里面不能出现B、C、D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})

//把满足2场A+的数据更新到neo4j中,设置flag=1
top2ASet.mapPartition(it=>{
//获取neo4j的连接
val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(userName, passWord))
//开启一个会话
val session = driver.session()
it.foreach(tup=>{
session.run("match (a:User {uid:'"+tup._1+"'}) where a.level >=4 set a.flag = 1")
})
//关闭会话
session.close()
//关闭连接
driver.close()
""
}).print()
}
}

集群执行

startUpdateVideoInfo.sh

1
2
开发任务脚本
startUpdateVideoInfo.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/bin/bash
#获取最近一个月的文件目录
#filepath=""
#for((i=1;i<=30;i++))
#do
# filepath+="hdfs://bigdata01:9000/data/video_info/"`date -d "$i days ago" +"%Y%m%d"`,
#done
#注意:在使用的时候需要将最后面的逗号去掉 ${filePath:0:-1}


#默认获取昨天时间
dt=`date -d "1 days ago" +"%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi

#HDFS输入数据路径
filePath="hdfs://bigdata01:9000/data/video_info/${dt}"

masterUrl="yarn-cluster"
appName="UpdateVideoInfoScala"`date +%s`
boltUrl="bolt://bigdata04:7687"
userName="neo4j"
passWord="admin"

#注意:需要将flink脚本路径配置到linux的环境变量中
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.imooc.flink.UpdateVideoInfoScala \
/data/soft/video_recommend_v2/jobs/update_video_info-1.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${boltUrl} ${userName} ${passWord}

#验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{tmp=$8;getline;print tmp","$8}'` // 这个程序提交到时候会有两个application,以前的判断方式不适合了
if [ "${appStatus}" != "SUCCEEDED,SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
1
此时其实会发现产生了两个Flink任务。

image-20230503173125131


本文标题:大数据开发工程师-第十八周 直播平台三度关系推荐v2.0-1

文章作者:TTYONG

发布时间:2023年04月24日 - 15:04

最后更新:2023年06月13日 - 09:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AB%E5%91%A8-%E7%9B%B4%E6%92%AD%E5%B9%B3%E5%8F%B0%E4%B8%89%E5%BA%A6%E5%85%B3%E7%B3%BB%E6%8E%A8%E8%8D%90v2-0-1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%