大数据开发工程师-第十周 第6章 TOP N主播统计


第十周 第6章 TOP N主播统计

实战:TopN主播统计

1
2
3
4
5
6
7
需求:计算每个大区当天金币收入TopN的主播
背景是这样的,我们有一款直播APP,已经在很多国家上线并运营了一段时间,产品经理希望开发一个功能,topN主播排行榜,按天更新排名信息,统计的维度有多种,其中有一个维度是针对主播当天直播的金币收入进行排名。

在我们的直播平台中有大区这个概念,一个大区下面包含多个国家,不同大区的运营策略是不一样的,所以就把不同国家划分到不同大区里面,方便运营。
那这个TopN主播排行榜在统计的时候就需要分大区统计了。
针对主播每天的开播数据我们已经有了,以及直播间内用户的送礼记录也都是有的。那这样其实就可以统计主播当天的金币收入了
主播一天可能会开播多次,所以后期在统计主播当天收入的时候是需要把他当天所有直播中的金币收入都计算在内的。
1
2
3
4
分析:我们有两份数据,数据都是json格式的
video_info.log 主播的开播记录,其中包含主播的id:uid、直播间id:vid 、大区:area、视频开播时长:length、增加粉丝数量:follow等信息

gift_record.log 用户送礼记录,其中包含送礼人id:uid,直播间id:vid,礼物id:good_id,金币数量:gold等信息

video_info.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{"uid":"8407173251001","vid":"14943445328940001","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":101,"share_num":"21","type":"video_info"}
{"uid":"8407173251002","vid":"14943445328940002","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":201,"share_num":"331","type":"video_info"}
{"uid":"8407173251003","vid":"14943445328940003","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":221,"share_num":"321","type":"video_info"}
{"uid":"8407173251004","vid":"14943445328940004","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":401,"share_num":"311","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940005","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":31,"share_num":"131","type":"video_info"}
{"uid":"8407173251006","vid":"14943445328940006","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":22,"share_num":"3431","type":"video_info"}
{"uid":"8407173251007","vid":"14943445328940007","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":44,"share_num":"131","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940008","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":66,"share_num":"131","type":"video_info"}
{"uid":"8407173251009","vid":"14943445328940009","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":32,"share_num":"231","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940010","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":342,"share_num":"431","type":"video_info"}
{"uid":"8407173251011","vid":"14943445328940011","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":223,"share_num":"331","type":"video_info"}
{"uid":"8407173251012","vid":"14943445328940012","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":554,"share_num":"312","type":"video_info"}
{"uid":"8407173251013","vid":"14943445328940013","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":334,"share_num":"321","type":"video_info"}
{"uid":"8407173251014","vid":"14943445328940014","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":653,"share_num":"311","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940015","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251001","vid":"14943445328940016","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":432,"share_num":"531","type":"video_info"}
{"uid":"8407173251005","vid":"14943445328940017","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":322,"share_num":"231","type":"video_info"}
{"uid":"8407173251008","vid":"14943445328940018","area":"CN","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":564,"share_num":"131","type":"video_info"}
{"uid":"8407173251010","vid":"14943445328940019","area":"ID","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":324,"share_num":"231","type":"video_info"}
{"uid":"8407173251015","vid":"14943445328940020","area":"US","status":"1","start_time":"1494344544","end_time":"1494344570","watch_num":532,"share_num":"331","type":"video_info"}

gift_record.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{"uid":"7201232141001","vid":"14943445328940001","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141002","vid":"14943445328940001","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141003","vid":"14943445328940002","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141004","vid":"14943445328940002","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141005","vid":"14943445328940003","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141006","vid":"14943445328940003","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141007","vid":"14943445328940004","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141008","vid":"14943445328940004","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141009","vid":"14943445328940005","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141010","vid":"14943445328940005","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141011","vid":"14943445328940006","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141012","vid":"14943445328940006","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141013","vid":"14943445328940007","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141014","vid":"14943445328940007","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141015","vid":"14943445328940008","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141016","vid":"14943445328940008","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141017","vid":"14943445328940009","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141018","vid":"14943445328940009","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141019","vid":"14943445328940010","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141020","vid":"14943445328940010","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141021","vid":"14943445328940011","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141022","vid":"14943445328940011","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141023","vid":"14943445328940012","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141024","vid":"14943445328940012","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141025","vid":"14943445328940013","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141026","vid":"14943445328940013","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141027","vid":"14943445328940014","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141028","vid":"14943445328940014","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141029","vid":"14943445328940015","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141030","vid":"14943445328940015","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141031","vid":"14943445328940016","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141032","vid":"14943445328940016","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141033","vid":"14943445328940017","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141034","vid":"14943445328940017","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141035","vid":"14943445328940018","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141036","vid":"14943445328940018","good_id":"223","gold":"10","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141037","vid":"14943445328940019","good_id":"223","gold":"20","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141038","vid":"14943445328940019","good_id":"223","gold":"30","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141039","vid":"14943445328940020","good_id":"223","gold":"40","timestamp":1494344574,"type":"gift_record"}
{"uid":"7201232141040","vid":"14943445328940020","good_id":"223","gold":"50","timestamp":1494344574,"type":"gift_record"}

pom.xml文件中依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency> // fastjson要用的
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
</dependencies>

pom.xml文件中编译打包配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
TopN主播统计
1:首先获取两份数据中的核心字段,使用fastjson包解析数据
主播开播记录(video_info.log):主播ID:uid,直播间ID:vid,大区:area
(vid,(uid,area))
用户送礼记录(gift_record.log):直播间ID:vid,金币数量:gold
(vid,gold)

这样的话可以把这两份数据关联到一块就能获取到大区、主播id、金币这些信息了,使用直播间vid进行关联

2:对用户送礼记录数据进行聚合,对相同vid的数据求和
因为用户可能在一次直播中给主播送多次礼物
(vid,gold_sum)

3:把这两份数据join到一块,vid作为join的key
(vid,((uid,area),gold_sum))

4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换成这种格式

uid和area是一一对应的,一个人只能属于大区
((uid,area),gold_sum)

5:使用reduceByKey算子对数据进行聚合
((uid,area),gold_sum_all)

6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
因为我们要分区统计TopN,所以要根据大区分组
map:(area,(uid,gold_sum_all))
groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)>

7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
这个topN其实就是把前几名主播的id还有金币数量拼接成一个字符串
(area,topN)

8:使用foreach将结果打印到控制台,多个字段使用制表符分割
area topN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.imooc.scala
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:TopN主播统计
* 1:首先获取两份数据中的核心字段,使用fastjson包解析数据
* 主播开播记录(video_info.log):主播ID:uid,直播间ID:vid,大区:area
* (vid,(uid,area))
* 用户送礼记录(gift_record.log):直播间ID:vid,金币数量:gold
* (vid,gold)
*
* 这样的话可以把这两份数据关联到一块就能获取到大区、主播id、金币这些信息了,使用直播
*
* 2:对用户送礼记录数据进行聚合,对相同vid的数据求和
* 因为用户可能在一次直播中给主播送多次礼物
* (vid,gold_sum)
*
* 3:把这两份数据join到一块,vid作为join的key
* (vid,((uid,area),gold_sum))
*
* 4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
* 由于一个主播一天可能会开播多次,后面需要基于uid和area再做一次聚合,所以把数据转换
*
* uid和area是一一对应的,一个人只能属于大区
* ((uid,area),gold_sum)
*
* 5:使用reduceByKey算子对数据进行聚合
* ((uid,area),gold_sum_all)
*
* 6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
* 因为我们要分区统计TopN,所以要根据大区分组
* map:(area,(uid,gold_sum_all))
* groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_all)
*
* 7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,topN
* 这个topN其实就是把前几名主播的id还有金币数量拼接成一个字符串
* (area,topN)
*
* 8:使用foreach将结果打印到控制台,多个字段使用制表符分割
* area topN
* Created by xuwei
*/
object TopNScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("TopNScala")
.setMaster("local")
val sc = new SparkContext(conf)

//1:首先获取两份数据中的核心字段,使用fastjson包解析数据
val videoInfoRDD = sc.textFile("D:\\video_info.log")
val giftRecordRDD = sc.textFile("D:\\gift_record.log")
//(vid,(uid,area))
val videoInfoFieldRDD = videoInfoRDD.map(line=>{
val jsonObj = JSON.parseObject(line)
val vid = jsonObj.getString("vid")
val uid = jsonObj.getString("uid")
val area = jsonObj.getString("area")
(vid,(uid,area))
})
//(vid,gold)
val giftRecordFieldRDD = giftRecordRDD.map(line=>{
val jsonObj = JSON.parseObject(line)
val vid = jsonObj.getString("vid")
val gold = Integer.parseInt(jsonObj.getString("gold"))
(vid,gold)
})

//2:对用户送礼记录数据进行聚合,对相同vid的数据求和
//(vid,gold_sum)
val giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(_ + _)

//3:把这两份数据join到一块,vid作为join的key
//(vid,((uid,area),gold_sum))
val joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD)
//join操作返回(key,(xxxx))

//4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
//((uid,area),gold_sum)
val joinMapRDD = joinRDD.map(tup=>
//joinRDD: (vid,((uid,area),gold_sum))
//获取uid
val uid = tup._2._1._1
//获取area
val area = tup._2._1._2
//获取gold_sum
val gold_sum = tup._2._2
((uid,area),gold_sum)
})

//5:使用reduceByKey算子对数据进行聚合
//((uid,area),gold_sum_all)
val reduceRDD = joinMapRDD.reduceByKey(_ + _)

//6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
//map:(area,(uid,gold_sum_all))
//groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_sum_a
val groupRDD = reduceRDD.map(tup=>(tup._1._2,(tup._1._1,tup._2))).groupBy

//7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出area,t
//(area,topN)
val top3RDD = groupRDD.map(tup=>{
val area = tup._1
//toList:把iterable转成list
//sortBy:排序,默认是正序
//reverse:反转,实现倒序效果
//take(3):取前3个元素
//mkString:使用指定字符把集合转成字符串
//uid:gold_sum_all,uid:gold_sum_all,uid:gold_sum_all
val top3 = tup._2.toList.sortBy(_._2).reverse.take(3).map(tup=>tup._1+":"+tup._2).mkString(",")
(area,top3)
})

//8:使用foreach将结果打印到控制台,多个字段使用制表符分割
top3RDD.foreach(tup=>println(tup._1+"\t"+tup._2))
sc.stop()
}
}

image-20230326224809339

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.imooc.java;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
/**
* 需求:TopN主播统计
* Created by xuwei
*/
public class TopNJava {
public static void main(String[] args) {
//创建JavaSparkContext
SparkConf conf = new SparkConf();
conf.setAppName("TopNJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

//1:首先获取两份数据中的核心字段,使用fastjson包解析数据
JavaRDD<String> videoInfoRDD = sc.textFile("D:\\video_info.log");
JavaRDD<String> giftRecordRDD = sc.textFile("D:\\gift_record.log");

//(vid,(uid,area))
JavaPairRDD<String, Tuple2<String, String>> videoInfoFieldRDD = videoInfoRDD.mapToPair(new PairFunction<String,String,Tuple2<String,String>>(){
@Override
public Tuple2<String, Tuple2<String, String>> call(String line)
throws Exception {
JSONObject jsonObj = JSON.parseObject(line);
String vid = jsonObj.getString("vid");
String uid = jsonObj.getString("uid");
String area = jsonObj.getString("area");
return new Tuple2<String, Tuple2<String, String>>(vid, new Tuple2<String,String>(uid,area))
}
});

//(vid,gold)
JavaPairRDD<String, Integer> giftRecordFieldRDD = giftRecordRDD.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String line) throws Exception{
JSONObject jsonObj = JSON.parseObject(line);
String vid = jsonObj.getString("vid");
Integer gold = Integer.parseInt(jsonObj.getString("gold"));
return new Tuple2<String, Integer>(vid, gold);
}
});

//2:对用户送礼记录数据进行聚合,对相同vid的数据求和
//(vid,gold_sum)
JavaPairRDD<String, Integer> giftRecordFieldAggRDD = giftRecordFieldRDD.reduceByKey(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});

//3:把这两份数据join到一块,vid作为join的key
//(vid,((uid,area),gold_sum))
//JavaPairRDD<String,Tuple2<Tuple2<String,String>,Integer>> joinRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD);
//4:使用map迭代join之后的数据,最后获取到uid、area、gold_sum字段
//((uid,area),gold_sum)
JavaPairRDD<Tuple2<String, String>, Integer> joinMapRDD = videoInfoFieldRDD.join(giftRecordFieldAggRDD).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Integer>>,Tuple2<String,String>,Integer>()
@Override
public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String,Tuple2<Tuple2<String,String>,Integer>> tup
throws Exception {
//joinRDD:(vid,((uid,area),gold_sum))
//获取uid
String uid = tup._2._1._1;
//获取area
String area = tup._2._1._2;
//获取gold_sum
Integer gold_sum = tup._2._2;
return new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<String,String>(uid,area),gold_sum);
});
//5:使用reduceByKey算子对数据进行聚合
//((uid,area),gold_sum_all)
JavaPairRDD<Tuple2<String, String>, Integer> reduceRDD = joinMapRDD.reduceByKey(new Function2<Integer,Integer,Integer(){
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
//6:接下来对需要使用groupByKey对数据进行分组,所以先使用map进行转换
//map:(area,(uid,gold_sum_all))
//groupByKey: area,<(uid,gold_sum_all),(uid,gold_sum_all),(uid,gold_s
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupRDD = reduceRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>,String,Tuple2<String,Integer>>()
@Override
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<Tuple2<String,String>,Integer> tup)
throws Exception {
return new Tuple2<String, Tuple2<String, Integer>>(tup._1._2, new Tuple2<String,Integer>(tup._1._1,tup._2));
}
}).groupByKey();
//7:使用map迭代每个分组内的数据,按照金币数量倒序排序,取前N个,最终输出ar
//(area,topN)
JavaRDD<Tuple2<String, String>> top3RDD = groupRDD.map(new Function<T
@Override
public Tuple2<String, String> call(Tuple2<String, Iterable<Tuple2
throws Exception {
String area = tup._1;
ArrayList<Tuple2<String, Integer>> tupleList = Lists.newArrayList(tup._2);
//对集合中的元素排序
Collections.sort(tupleList, new Comparator<Tuple2<String, Integer>>(){
@Override
public int compare(Tuple2<String, Integer> t1, Tuple2<Str
return t2._2 - t1._2;
}
});
StringBuffer sb = new StringBuffer();
for (int i = 0; i < tupleList.size(); i++) {
if (i < 3) {//top 3
Tuple2<String, Integer> t = tupleList.get(i);
if (i != 0) {
sb.append(",");
}
sb.append(t._1 + ":" + t._2);
}
}
return new Tuple2<String, String>(area, sb.toString());
}
});
//8:使用foreach将结果打印到控制台,多个字段使用制表符分割
//area topN
top3RDD.foreach(new VoidFunction<Tuple2<String, String>>() {
@Override
public void call(Tuple2<String, String> tup) throws Exception {
System.out.println(tup._1+"\t"+tup._2);
}
});
sc.stop();
}
}

本文标题:大数据开发工程师-第十周 第6章 TOP N主播统计

文章作者:TTYONG

发布时间:2022年03月02日 - 10:03

最后更新:2023年03月26日 - 23:03

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%91%A8-%E7%AC%AC6%E7%AB%A0-TOP-N%E4%B8%BB%E6%92%AD%E7%BB%9F%E8%AE%A1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%