第十八周 直播平台三度关系推荐v2.0-1
V1.0架构存在的问题
1 | V1.0这个架构里面其实存在三个主要的问题 |

技术选型
1 | 接下来再来回顾一下技术选型,看看在V2.0中有哪些变化 |

1 | 在数据采集这块,去掉了Sqoop,因为在这里我们需要将三度关系数据导出到redis中,sqoop是不支持的,所以我们需要开发flink程序实现数据导出功能。 |
整体架构设计

1 | 这是最新的架构图,在这里面,替换了Spark计算引擎和MySQL数据库,引入了Flink和Redis |
数据采集模块开发
1 | 数据采集模块没有变化,所以在这就不再分析了。 |

数据计算核心指标详细分析

1 | 这里面的数据源和计算指标都没有变化 |
历史粉丝关注数据初始化(任务一)
1 | 第一步:历史粉丝关注数据初始化 |
1 | 通过浏览器访问neo4j,重新设置密码 |

实时维护粉丝关注数据(任务二)

1 | 使用Flink程序实时维护Neo4j中粉丝关注数据 |
父项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
子项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
RealTimeFollowScala.scala
1 | 在resources目录中添加log4j.properties配置文件 |
1 | package com.imooc.flink |
1 | 注意:由于flink中的实时计算是来一条数据计算一次,在StreamAPI中没有mapPartition方法,不支持一批一批的处理,如果每处理一条数据就获取一次Neo4j数据库连接,这样效率就太差了,所以我们需要实现一个自定义的sink组件,在sink组件内部有一个初始化函数可以获取一次连接,多次使用,这样就不需要频繁创建neo4j数据库连接了。 |
Neo4jSink.scala
1 | package com.imooc.flink |
本地执行
1 | 注意:需要确保zookeeper、kafka服务是正常运行的。 |
1 | 关注数据 |


1 | 到neo4j中确认效果发现确实新增了一个关注关系 |
集群执行
1 | 接下来我们希望把代码提交到集群上运行 |
1 | 对项目打jar包 |
1 | 注意:针对log4j,flink相关的依赖在打包的时候不需要打进去,所以需要添加provided属性 |
1 | 此时我们就需要使用这个带有jar-with-dependencies的jar包了 |
startRealTimeFollow.sh
1 | 开发任务提交脚本 |
1 | !/bin/bash |
1 | 注意:在执行之前需要配置flink的环境变量,FLINK_HOME |

1 | 向集群提交任务 |

1 | 通过kafka的console控制台生产者,模拟产生数据,到neo4j中确认效果,发现是没有问题的。 |
每天定时更新主播等级(任务三)
1 | 使用Flink程序实现每天定时更新主播等级 |
子项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
1 | 在resources目录中添加log4j.properties配置文件 |
UpdateUserLevelScala.scala
1 | 创建类:UpdateUserLevelScala |
1 | package com.imooc.flink |
本地执行
1 | 使用之前生成的这份数据 |
集群执行
1 | 接下来对程序编译打包 |
startUpdateUserLevel.sh
1 | !/bin/bash |
每天定时更新用户活跃时间(任务四)

1 | 使用Flink程序实现每天定时更新用户活跃时间 |
子项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
UpdateUserActiveScala.scala
1 | 在resources目录中添加log4j.properties配置文件 |
1 | package com.imooc.flink |
本地执行
1 | 使用之前生成的这份数据 |
集群执行
1 | 接下来对程序编译打包 |
startUpdateUserActive.sh
1 | 开发任务脚本 |
1 | !/bin/bash |
每周一计算最近一个月主播视频评级(任务五)

1 | 使用Flink程序实现每周一计算最近一个月主播视频评级 |
子项目pom
1 | <?xml version="1.0" encoding="UTF-8"?> |
UpdateVideoInfoScala.scala
1 | 在resources目录中添加log4j.properties配置文件 |
1 | package com.imooc.flink |
本地执行
1 | 使用之前生成的这份数据 |
异常

1 | 代码中对uid进行分区那里没有显示指定类型,会报错。也是常见错误 |

1 | 上面代码mappartion中的数据库链接部分,没有创建session,但并没有报错,用的是外部driver端的session;数据库链接并不支持序列化,不支持从driver到节点的传输。这种异常很常见。 |
多个输入目录问题
1 | 注意:flink目前不支持直接读取多个hdfs目录,在spark中,我们可以将多个hdfs目录使用逗号拼接成一个输入路径,flink目前不支持这种用法。 |
UpdateVideoInfoMoreFileScala.scala
1 | 那如何实现读取最近一个月的数据呢? |
1 | package com.imooc.flink |
集群执行
startUpdateVideoInfo.sh
1 | 开发任务脚本 |
1 | !/bin/bash |
1 | 此时其实会发现产生了两个Flink任务。 |
