大数据开发工程师-第十周-第4章-transformation与action实战


第十周 第4章 transformation与action实战

创建RDD

1
2
3
4
5
6
7
8
RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD
这样就相当于设置了Spark应用程序的输入源数据
然后在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD

Spark提供三种创建RDD方式:集合、本地文件、HDFS文件
使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。

使用集合创建RDD

1
2
3
4
5
6
7
首先来看一下如何使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。

调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。
Spark会为每一个partition运行一个task来进行处理。
Spark默认会根据集群的配置来设置partition的数量。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:使用集合创建RDD
* Created by xuwei
*/
object CreateRddByArrayScala {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf()
conf.setAppName("CreateRddByArrayScala ")//设置任务名称
.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf)
//创建集合
val arr = Array(1,2,3,4,5)
//基于集合创建RDD
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
println(sum)
//停止SparkContext
sc.stop()
}
}
1
2
3
4
注意:
val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行

parallelize还有reduce之类的操作是在worker节点中执行的

image-20230324161841214

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 需求:使用集合创建RDD
* Created by xuwei
*/
public class CreateRddByArrayJava {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("CreateRddByArrayJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//创建集合
List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(arr);
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
System.out.println(sum);
//停止sparkContext
sc.stop();
}
}

使用本地文件(Linux也算本地)和HDFS文件创建RDD

scala

1
2
3
4
5
下面我们来看一下使用本地文件和HDFS文件创建RDD
通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容

textFile()方法支持针对目录、压缩文件以及通配符创建RDD
Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:通过文件创建RDD
* 1:本地文件
* 2:HDFS文件
* Created by xuwei
*/
object CreateRddByFileScala {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf()
conf.setAppName("CreateRddByFileScala")//设置任务名称
.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf)
var path = "D:\\hello.txt"
path = "hdfs://bigdata01:9000/test/hello.txt" (任选其一都行)
//读取文件数据,可以在textFile中指定生成的RDD的分区数量
val rdd = sc.textFile(path,2)
//获取每一行数据的长度,计算文件内数据的总长度
val length = rdd.map(_.length).reduce(_ + _)
println(length)
sc.stop()
}
}

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* Created by xuwei
*/
public class CreateRddByFileJava {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("CreateRddByFileJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "D:\\hello.txt";
path = "hdfs://bigdata01:9000/test/hello.txt";
//读取文件数据,可以在textFile中指定生成的RDD的分区数量
JavaRDD<String> rdd = sc.textFile(path,2);
//获取每一行数据的长度
JavaRDD<Integer> lengthRDD = rdd.map(new Function<String, Integer>()
@Override
public Integer call(String line) throws Exception {
return line.length();
}
});
//计算文件内数据的总长度
Integer length = lengthRDD.reduce(new Function2<Integer, Integer, Int
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(length);
sc.stop();
}
}

Transformation和Action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
接下来我们详细分析一下Spark中对RDD的操作
Spark对RDD的操作可以整体分为两类:
Transformation和Action

这里的Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter、reduceByKey等等

Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如foreach、reduce、保存到文件等,并且还可以把结果返回给Driver程序

不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子,例如:map算子、reduce算子

其中Transformation算子有一个特性:lazy
lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行。也就是说,transformation是不会触发spark任务的执行,它们只是记录了对RDD所做的操作,不会执行。

只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。

Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。

Action的特性:执行Action操作才会触发一个Spark任务的运行,从而触发这个Action之前所有的Transformation的执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//第一步:创建SparkContext
val conf = new SparkConf()
conf.setAppName("WordCountScala")//设置任务名称
//.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf)
//第二步:加载数据
var path = "D:\\hello.txt"
if(args.length==1){
path = args(0)
}
//这里通过textFile()方法,针对外部文件创建了一个RDD,linesRDD,实际上,程序执行到这
val linesRDD = sc.textFile(path)
//第三步:对数据进行切割,把一行数据切分成一个一个的单词
//这里通过flatMap算子对linesRDD进行了转换操作,把每一行数据中的单词切开,获取了一个
val wordsRDD = linesRDD.flatMap(_.split(" "))
//第四步:迭代words,将每个word转化为(word,1)这种形式
//这个操作和前面分析的flatMap的操作是一样的,最终获取了一个逻辑上的pairRDD,此时里面
val pairRDD = wordsRDD.map((_,1))
//第五步:根据key(其实就是word)进行分组聚合统计
//这个操作也是和前面分析的flatMap操作是一样的,最终获取了一个逻辑上的wordCountRDD,
val wordCountRDD = pairRDD.reduceByKey(_ + _)
//第六步:将结果打印到控制台
//这行代码执行了一个action操作,foreach,此时会触发之前所有transformation算子的执行
//注意:只有当任务执行到这一行代码的时候任务才会真正开始执行计算,如果任务中没有这一行
wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))
//第七步:停止SparkContext
sc.stop()

常用Transformation介绍

1
2
那下面我们先来看一下Spark中的Transformation算子
先来看一下官方文档,进入2.4.3的文档界面

image-20230324170443067

image-20230324170559110

1
这里面列出了Spark支持的所有的transformation算子

image-20230324170659161

1
在这里我们先讲一些目前常见的transformation算子,个别transformation算子会在后面针对具体的应用场景分析的时候再涉及
1
2
3
4
5
6
7
8
9
算子 介绍
map 将RDD中的每个元素进行处理,一进一出
filter 对RDD中每个元素进行判断,返回true则保留
flatMap 与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey 对每个相同key对应的value进行排序操作(全局排序)
join 对两个包含<key,value>对的RDD进行join操作
distinct 对RDD中的元素进行全局去重

Transformation操作开发实战

1
2
3
4
5
6
7
8
9
10
下面我们来针对常见的Transformation来具体写一些案例

map:对集合中每个元素乘以2
filter:过滤出集合中的偶数
flatMap:将行拆分为单词
groupByKey:对每个大区的主播进行分组
reduceByKey:统计每个大区的主播数量
sortByKey:对主播的音浪收入排序
join:打印每个主播的大区信息和音浪收入
distinct:统计当天开播的大区信息
scala

image-20230324223941380

image-20230324173348946

image-20230324173711414

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Transformation实战
* map:对集合中每个元素乘以2
* filter:过滤出集合中的偶数
* flatMap:将行拆分为单词
* groupByKey:对每个大区的主播进行分组
* reduceByKey:统计每个大区的主播数量
* sortByKey:对主播的音浪收入排序
* join:打印每个主播的大区信息和音浪收入
* distinct:统计当天开播的主播数量
*
* Created by xuwei
*/
object TransformationOpScala {
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//map:对集合中每个元素乘以2
//mapOp(sc)
//filter:过滤出集合中的偶数
//filterOp(sc)
//flatMap:将行拆分为单词
//flatMapOp(sc)
//groupByKey:对每个大区的主播进行分组
//groupByKeyOp(sc)
//groupByKeyOp2(sc)
//reduceByKey:统计每个大区的主播数量
//reduceByKeyOp(sc)
//sortByKey:对主播的音浪收入排序
//sortByKeyOp(sc)
//join:打印每个主播的大区信息和音浪收入
//joinOp(sc)
//distinct:统计当天开播的大区信息
//distinctOp(sc)
sc.stop()
}

def distinctOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"C
//由于是统计开播的大区信息,需要根据大区信息去重,所以只保留大区信息
dataRDD.map(_._2).distinct().foreach(println(_))
}

def joinOp(sc: SparkContext): Unit = {
val dataRDD1 = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")
val dataRDD2 = sc.parallelize(Array((150001,400),(150002,200),(150003,300),(150004,100))
val joinRDD = dataRDD1.join(dataRDD2)
// 返回(id,(area,gold))
//joinRDD.foreach(println(_))
joinRDD.foreach(tup=>{
//用户id
val uid = tup._1
val area_gold = tup._2
//大区
val area = area_gold._1
//音浪收入
val gold = area_gold._2
println(uid+"\t"+area+"\t"+gold)
})
}

def sortByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,400),(150002,200),(150003,300)
//由于需要对音浪收入进行排序,所以需要把音浪收入作为key,在这里要进行位置互换
/*dataRDD.map(tup=>(tup._2,tup._1))
.sortByKey(false)//默认是正序,第一个参数为true,想要倒序需要把这个参数设置为false
.foreach(println(_))*/
//sortBy的使用(其它的By..也有):可以动态指定排序的字段,比较灵活
dataRDD.sortBy(_._2,false).foreach(println(_))
}
def reduceByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"C
//由于这个需求只需要使用到大区信息,所以在map操作的时候只保留大区信息即可
//为了计算大区的数量,所以在大区后面拼上了1,组装成了tuple2这种形式,这样就可以使
dataRDD.map(tup=>(tup._2,1)).reduceByKey(_ + _).foreach(println(_))
}

def groupByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US"),(150002,"CN"),(150003,"CN"),(150004,"IN")))
//需要使用map对tuple中的数据进行位置互换,因为我们需要把大区作为key进行分组操作
//此时的key就是tuple中的第一列,其实在这里就可以把这个tuple认为是一个key-value
//注意:在使用类似于groupByKey这种基于key的算子的时候,需要提前把RDD中的数据组装
//此时map算子之后生成的新的数据格式是这样的:("US",150001)
//如果tuple中的数据列数超过了2列怎么办?看groupByKeyOp2
dataRDD.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{
//获取大区信息
val area = tup._1
print(area+":")
//获取同一个大区对应的所有用户id
val it = tup._2
for(uid <- it){
print(uid+" ")
}
println()
})
}

def groupByKeyOp2(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array((150001,"US","male"),(150002,"CN","female"),(150003,"CN","female"),(150004,"IN","male")
//如果tuple中的数据列数超过了2列怎么办?
//把需要作为key的那一列作为tuple2的第一列,剩下的可以再使用一个tuple2包装一下
//此时map算子之后生成的新的数据格式是这样的:("US",(150001,"male"))
//注意:如果你的数据结构比较复杂,你可以在执行每一个算子之后都调用foreach打印一下
dataRDD.map(tup=>(tup._2,(tup._1,tup._3))).groupByKey().foreach(tup=>{
//获取大区信息
val area = tup._1
print(area+":")
//获取同一个大区对应的所有用户id和性别信息
val it = tup._2
for((uid,sex) <- it){
print("<"+uid+","+sex+"> ")
}
println()
})
}

def flatMapOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array("good good study","day day up"))
dataRDD.flatMap(_.split(" ")).foreach(println(_))
}

def filterOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
dataRDD.filter(_ % 2 ==0).foreach(println(_))
}

def mapOp(sc: SparkContext): Unit ={
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
dataRDD.map(_ * 2).foreach(println(_))

/**
* 获取SparkContext
* @return
*/
private def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("TransformationOpScala")
.setMaster("local")
new SparkContext(conf)
}
}
1
2
3
groupBy
sortBy
reduceBy
1
joinop直接打印joinRDD.foreach(_)

image-20230324212605852

image-20230324213009607

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package com.imooc.java;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Int;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:
* map:对集合中每个元素乘以2
* filter:过滤出集合中的偶数
* flatMap:将行拆分为单词
* groupByKey:对每个大区的主播进行分组
* reduceByKey:统计每个大区的主播数量
* sortByKey:对主播的音浪收入排序
* join:打印每个主播的大区信息和音浪收入
* distinct:统计当天开播的主播数量
*
* Created by xuwei
*/
public class TransformationOpJava {
public static void main(String[] args) {
JavaSparkContext sc = getSparkContext();
//map:对集合中每个元素乘以2
//mapOp(sc);
//filter:过滤出集合中的偶数
//filterOp(sc);
//flatMap:将行拆分为单词
//flatMapOp(sc);
//groupByKey:对每个大区的主播进行分组
//groupByKeyOp(sc);
//groupByKeyOp2(sc);
//reduceByKey:统计每个大区的主播数量
//reduceByKeyOp(sc);
//sortByKey:对主播的音浪收入排序
//sortByKeyOp(sc);
//join:打印每个主播的大区信息和音浪收入
//joinOp(sc);
//distinct:统计当天开播的主播数量
//distinctOp(sc);
sc.stop();
}
private static void distinctOp(JavaSparkContext sc) {
Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");
Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");
Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");
Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");
JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.map(new Function<Tuple2<Integer, String>, String>() {
@Override
public String call(Tuple2<Integer, String> tup) throws Exception {
return tup._2;
}
}).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}

private static void joinOp(JavaSparkContext sc) {
Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");
Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");
Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");
Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");
Tuple2<Integer,Integer> t5 = new Tuple2<Integer,Integer>(150001, 400);
Tuple2<Integer,Integer> t6 = new Tuple2<Integer,Integer>(150002, 200);
Tuple2<Integer,Integer> t7 = new Tuple2<Integer,Integer>(150003, 300);
Tuple2<Integer,Integer> t8 = new Tuple2<Integer,Integer>(150004, 100);
JavaRDD<Tuple2<Integer, String>> dataRDD_1 = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
JavaRDD<Tuple2<Integer, Integer>> dataRDD_2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
JavaPairRDD<Integer, String> dataRDD1Pair = dataRDD_1.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<Integer, String> tup) throws Exception {
return new Tuple2<>(tup._1, tup._2);
}
});
JavaPairRDD<Integer, Integer> dataRDD2Pair = dataRDD_2.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tup) throws Exception {
return new Tuple2<>(tup._1, tup._2);
}
});
dataRDD1Pair.join(dataRDD2Pair).foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t2) throws Exception {
System.out.println(t2._1+" "+t2._2._1+" "+t2._2._2);
}
});

}

private static void sortByKeyOp(JavaSparkContext sc) {
Tuple2<Integer,Integer> t1 = new Tuple2<Integer,Integer>(150001, 400);
Tuple2<Integer,Integer> t2 = new Tuple2<Integer,Integer>(150002, 200);
Tuple2<Integer,Integer> t3 = new Tuple2<Integer,Integer>(150003, 300);
Tuple2<Integer,Integer> t4 = new Tuple2<Integer,Integer>(150004, 100);
JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<>(t2._2,t2._1);
}
}).sortByKey().foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> t2) throws Exception {
System.out.println(t2._1+" "+t2._2);
}
});
}

private static void sortBy(JavaSparkContext sc) {
Tuple2<Integer,Integer> t1 = new Tuple2<Integer,Integer>(150001, 400);
Tuple2<Integer,Integer> t2 = new Tuple2<Integer,Integer>(150002, 200);
Tuple2<Integer,Integer> t3 = new Tuple2<Integer,Integer>(150003, 300);
Tuple2<Integer,Integer> t4 = new Tuple2<Integer,Integer>(150004, 100);
JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer call(Tuple2<Integer, Integer> tup) throws Exception {
return tup._2;
}
},false,1).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> t2) throws Exception {
System.out.println(t2._2+" "+t2._1);
}
});
}

private static void reduceByKeyOp(JavaSparkContext sc) {
Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");
Tuple2<Integer, String> t2 = new Tuple2<>(150002, "CN");
Tuple2<Integer, String> t3 = new Tuple2<>(150003, "IN");
Tuple2<Integer, String> t4 = new Tuple2<>(150004, "US");
JavaRDD<Tuple2<Integer, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
return new Tuple2<>(t._2,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> t2) throws Exception {
System.out.println(t2._1+" :"+t2._2);
}
});
}

private static void groupByKeyOp(JavaSparkContext sc) {
Tuple2<Integer,String> t1 = new Tuple2<Integer,String>(150001, "US");
Tuple2<Integer,String> t2 = new Tuple2<Integer,String>(150002, "CN");
Tuple2<Integer,String> t3 = new Tuple2<Integer,String>(150003, "CN");
Tuple2<Integer,String> t4 = new Tuple2<Integer,String>(150004, "IN");
JavaRDD<Tuple2<Integer,String>> dataRDD = sc.parallelize(Arrays.asList(t1,t2,t3,t4));
//如果想要使用...ByKey之类的算子,需要先使用...ToPair算子
dataRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>(){
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)
throws Exception {
return new Tuple2<String, Integer>(tup._2,tup._1);
}
}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Interable<Integer>>>(){
@Override
public void call(Tuple2<String, Iterable<Integer>> tup) throws Exception{
//获取大区信息
String area = tup._1;
System.out.print(area+":");
//获取同一个大区对应的所有用户id
Iterable<Integer> it = tup._2;
for (Integer uid: it) {
System.out.print(uid+" ");
}
System.out.println();
}
});
}

private static void groupByKeyOp2(JavaSparkContext sc) {
Tuple3<Integer, String, String> t1 = new Tuple3<>(150001, "US", "male");
Tuple3<Integer, String, String> t2 = new Tuple3<>(150002, "CN", "female");
Tuple3<Integer, String, String> t3 = new Tuple3<>(150003, "IN", "male");
Tuple3<Integer, String, String> t4 = new Tuple3<>(150004, "US", "female");
JavaRDD<Tuple3<Integer, String, String>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple3<Integer, String, String>, String, Tuple2<Integer,String>>() {

@Override
public Tuple2<String, Tuple2<Integer, String>> call(Tuple3<Integer, String, String> t) throws Exception {
return new Tuple2<String,Tuple2<Integer,String>>(t._2(),new Tuple2<>(t._1(),t._3()));
}
}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<Integer, String>>>>() {
@Override
public void call(Tuple2<String, Iterable<Tuple2<Integer, String>>> t) throws Exception {
String t1=t._1;
System.out.print(t1+" :");

for(Tuple2<Integer,String> i:t._2){
System.out.println(i._1+" "+i._2);
}
}
});
}

private static void flatMapOp(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("good good study","day day up"));
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
String[] words = line.split(" ");
return Arrays.asList(words).iterator();
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String word) throws Exception {
System.out.println(word);
}
}

private static void filterOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer i1) throws Exception {
return i1 % 2 == 0;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i1) throws Exception {
System.out.println(i1);
}
});
}

private static void mapOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer i1) throws Exception {
return i1 * 2;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i1) throws Exception {
System.out.println(i1);
}
});
}

private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf();
conf.setAppName("TransformationOpJava")
.setMaster("local");
return new JavaSparkContext(conf);
}
}

常用Action介绍

image-20230326130931048

1
2
3
4
5
6
7
8
算子 介绍
reduce 将RDD中的所有元素进行聚合操作
collect 将RDD中所有元素获取到本地客户端(Driver)
count 获取RDD中元素总数
take(n) 获取RDD中前n个元素
saveAsTextFile 将RDD中元素保存到文件中,对每个元素调用toString
countByKey 对每个key对应的值进行count计数
foreach 遍历RDD中的每个元素

Action操作开发实战

1
2
3
4
5
6
7
8
下面针对常见的Action算子来写一些具体案例
reduce:聚合计算
collect:获取元素集合
take(n):获取前n个元素
count:获取元素总数
saveAsTextFile:保存文件
countByKey:统计相同的key出现多少次
foreach:迭代遍历元素
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Action实战
* reduce:聚合计算
* collect:获取元素集合
* take(n):获取前n个元素
* count:获取元素总数
* saveAsTextFile:保存文件
* countByKey:统计相同的key出现多少次
* foreach:迭代遍历元素
*
* Created by xuwei
*/
object ActionOpScala {
def main(args: Array[String]): Unit = {
val sc = getSparkContext
//reduce:聚合计算
//reduceOp(sc)
//collect:获取元素集合
//collectOp(sc)
//take(n):获取前n个元素
//takeOp(sc)
//count:获取元素总数
//countOp(sc)
//saveAsTextFile:保存文件
//saveAsTextFileOp(sc)
//countByKey:统计相同的key出现多少次
//countByKeyOp(sc)
//foreach:迭代遍历元素
//foreachOp(sc)
sc.stop()
}

def foreachOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//注意:foreach不仅限于执行println操作,这个只是在测试的时候使用的
//实际工作中如果需要把计算的结果保存到第三方的存储介质中,就需要使用foreach,
//在里面实现具体向外部输出数据的代码
dataRDD.foreach(println(_))
}

def countByKeyOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))
//返回的是一个map类型的数据
val res = dataRDD.countByKey()
for((k,v) <- res){
println(k+","+v)
}
}

def saveAsTextFileOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//指定HDFS的路径信息即可,需要指定一个不存在的目录
dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out0524")
}

def countOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val res = dataRDD.count()
println(res)
}

def takeOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//从RDD中获取前2个元素
val res = dataRDD.take(2)
for(item <- res){
println(item)
}
}

def collectOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
//collect返回的是一个Array数组
//注意:如果RDD中数据量过大,不建议使用collect,因为最终的数据会返回给Driver进程
//如果想要获取几条数据,查看一下数据格式,可以使用take(n)
val res = dataRDD.collect()
for(item <- res){
println(item)
}
}

def reduceOp(sc: SparkContext): Unit = {
val dataRDD = sc.parallelize(Array(1,2,3,4,5))
val num = dataRDD.reduce(_ + _)
println(num)
}

/**
* @return
*/

private def getSparkContext = {
val conf = new SparkConf()
conf.setAppName("ActionOpScala")
.setMaster("local")
new SparkContext(conf)
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* 需求:Action实战
* reduce:聚合计算
* collect:获取元素集合
* take(n):获取前n个元素
* count:获取元素总数
* saveAsTextFile:保存文件
* countByKey:统计相同的key出现多少次
* foreach:迭代遍历元素
*
* Created by xuwei
*/
public class ActionOpJava {
public static void main(String[] args) {
JavaSparkContext sc = getSparkContext();
//reduce:聚合计算
//reduceOp(sc);
//collect:获取元素集合
//collectOp(sc);
//take(n):获取前n个元素
//takeOp(sc);
//count:获取元素总数
//countOp(sc);
//saveAsTextFile:保存文件
//saveAsTextFileOp(sc);
//countByKey:统计相同的key出现多少次
//countByKeyOp(sc);
//foreach:迭代遍历元素
//foreachOp(sc);
sc.stop();
}

private static void foreachOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5);
dataRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer i) throws Exception {
System.out.println(i);
}
});
}

private static void countByKeyOp(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("A", 1001);
Tuple2<String, Integer> t2 = new Tuple2<>("B", 1002);
Tuple2<String, Integer> t3 = new Tuple2<>("A", 1003);
Tuple2<String, Integer> t4 = new Tuple2<>("C", 1004);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1,t2,t3,t4));
//想要使用countByKey,需要先使用mapToPair对RDD进行转换
Map<String, Long> res = dataRDD.mapToPair(new PairFunction<Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> tup)
throws Exception {
return new Tuple2<String, Integer>(tup._1, tup._2);
}
}).countByKey();
for(Map.Entry<String,Long> entry: res.entrySet()){
System.out.println(entry.getKey()+","+entry.getValue());
}
}

private static void saveAsTextFileOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.saveAsTextFile("hdfs://bigdata01:9000/out05242");
}
private static void countOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
long res = dataRDD.count();
System.out.println(res);
}

private static void takeOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
List<Integer> res = dataRDD.take(2);
for(Integer item : res){
System.out.println(item);
}
}

private static void collectOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5);
List<Integer> res = dataRDD.collect();
for(Integer item : res){
System.out.println(item);
}
}

private static void reduceOp(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5
Integer num = dataRDD.reduce(new Function2<Integer, Integer, Integer>
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
System.out.println(num);

private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf();
conf.setAppName("ActionOpJava")
.setMaster("local");
return new JavaSparkContext(conf);
}
}

本文标题:大数据开发工程师-第十周-第4章-transformation与action实战

文章作者:TTYONG

发布时间:2022年03月02日 - 10:03

最后更新:2023年05月29日 - 12:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%91%A8-%E7%AC%AC4%E7%AB%A0-transformation%E4%B8%8Eaction%E5%AE%9E%E6%88%98.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%