大数据开发工程师-第十六周 Flink极速上手篇-Flink核心API之DataSetAPI-4


第十六周 Flink极速上手篇-Flink核心API之DataSetAPI-4

DataSet API

1
2
3
4
5
6
DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。

DataSource是程序的数据源输入。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。

DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

DataSet API之DataSource

1
2
3
4
5
6
7
针对DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。

基于集合
fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。

基于文件
readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。

DataSet API之Transformation

1
2
3
4
5
6
7
算子		 	解释
map 输入一个元素进行处理,返回一个元素
mapPartition 类似map,一次处理一个分区的数据
flatMap 输入一个元素进行处理,可以返回多个元素
filter 对数据进行过滤,符合条件的数据会被留下
reduce 对当前元素和上一次的结果进行聚合操作
aggregate sum(),min(),max()等
1
2
3
4
5
这里面的算子我们都是比较熟悉的,在前面DatatreamAPI中都用过,用法都是一样的,所以在这就不再演示了
mapPartition这个算子我们在Flink中还没用过,不过在Spark中是用过的,用法也是一样的

其实mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。
下面来演示一下Flink中mapPartition的使用

mapPartition

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* MapPartition的使用:一次处理一个分区的数据
* Created by xuwei
*/
object BatchMapPartitionScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//生成数据源数据
val text = env.fromCollection(Array("hello you", "hello me"))
//每次处理一个分区的数据
text.mapPartition(it=>{
//可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
//注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,
val res = ListBuffer[String]()
it.foreach(line=>{
val words = line.split(" ")
for(word <- words){
res.append(word)
}
})
res
//关闭数据库连接
}).print()
//No new data sinks have been defined since the last execution.
//The last execution refers to the latest call to 'execute()', 'count()',
//注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指定execute即可
//env.execute("BatchMapPartitionScala")
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.imooc.java.batch.transformation;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Iterator;
/**
* MapPartition的使用:一次处理一个分区的数据
* Created by xuwei
*/
public class BatchMapPartitionJava {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//生成数据源数据
DataSource<String> text = env.fromCollection(Arrays.asList("hello you","hello me"));
//每次处理一个分区的数据
text.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> iterable, Collector<String> out)
throws Exception {
//可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
Iterator<String> it = iterable.iterator();
while(it.hasNext()){
String line = it.next();
String[] words = line.split(" ");
for(String word: words){
out.collect(word);
}
}
//关闭数据库连接
}
}).print();
}
}
1
2
3
4
5
6
7
8
9
10
下面还有一些transformation算子
算子 解释
distinct 返回数据集中去重之后的元素
join 内连接
outerJoin 外连接
cross 获取两个数据集的笛卡尔积
union 返回多个数据集的总和,数据类型需要一致
first-n 获取集合中的前N个元素
distinct算子比较简单,就是对数据进行全局去重。
join:内连接,可以连接两份数据集

join

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.imooc.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* join:内连接
* Created by xuwei
*/
object BatchJoinScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据 Tuple2<用户id,用户姓名>
val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")
//初始化第二份数据 Tuple2<用户id,用户所在城市>
val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
//对两份数据集执行join操作
text1.join(text2)
//注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
//where:指定左边数据集中参与比较的元素角标
.where(0)
//equalTo指定右边数据集中参与比较的元素角标
.equalTo(0){(first,second)=>{
(first._1,first._2,second._2)
}}.print()
}
}

image-20230410135332014

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.imooc.java.batch.transformation;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
import java.util.Arrays;
/**
* join:内连接
* Created by xuwei
*/
public class BatchJoinJava {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//初始化第一份数据 Tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<Integer,String>(1,"jack"));
data1.add(new Tuple2<Integer,String>(2,"tom"));
data1.add(new Tuple2<Integer,String>(3,"mick"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1)
//初始化第二份数据 Tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<Integer,String>(1,"bj"));
data2.add(new Tuple2<Integer,String>(2,"sh"));
data2.add(new Tuple2<Integer,String>(4,"gz"));
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2)
//对两份数据集执行join操作
text1.join(text2)
.where(0)
.equalTo(0)
//三个输入参数:
//第一个tuple2是左边数据集的类型,
//第二个tuple2是右边数据集的类型,
//第三个tuple3是此函数返回的数据集类型
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer,String>,Tuple3<Integer,String,String>>()
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer,String> secondfirst,Tuple2<Integer,String
throws Exception {
return new Tuple3<Integer, String, String>(first.f0,f
}
}).print();
}
}
1
outerJoin:外连接

outerJoin

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.imooc.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* outerJoin:外连接
* 一共有三种情况
* 1:leftOuterJoin
* 2:rightOuterJoin
* 3:fullOuterJoin
* Created by xuwei
*/
object BatchOuterJoinScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据 Tuple2<用户id,用户姓名>
val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")
//初始化第二份数据 Tuple2<用户id,用户所在城市>
val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
//对两份数据集执行leftOuterJoin操作
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0){
(first,second)=>{
//注意:second中的元素可能为null
if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}
}.print()
println("========================================")
//对两份数据集执行rightOuterJoin操作
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0){
(first,second)=>{
//注意:first中的元素可能为null
if(first==null){
(second._1,"null",second._2)
}else{
(first._1,first._2,second._2)
}
}
}.print()
println("========================================")
//对两份数据集执行rightOuterJoin操作
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0){
(first,second)=>{
//注意:first和second中的元素都有可能为null
if(first==null){
(second._1,"null",second._2)
}else if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}
}.print()
}
}

image-20230410151623671

image-20230410151648029

image-20230410151719016

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.imooc.java.batch.transformation;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
/**
* outerJoin:外连接
* 一共有三种情况
* 1:leftOuterJoin
* 2:rightOuterJoin
* 3:fullOuterJoin
* Created by xuwei
*/
public class BatchOuterJoinJava {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironme
//初始化第一份数据 Tuple2<用户id,用户姓名>
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<>();
data1.add(new Tuple2<Integer,String>(1,"jack"));
data1.add(new Tuple2<Integer,String>(2,"tom"));
data1.add(new Tuple2<Integer,String>(3,"mick"));
DataSource<Tuple2<Integer, String>> text1 = env.fromCollection(data1)
//初始化第二份数据 Tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<>();
data2.add(new Tuple2<Integer,String>(1,"bj"));
data2.add(new Tuple2<Integer,String>(2,"sh"));
data2.add(new Tuple2<Integer,String>(4,"gz"));
DataSource<Tuple2<Integer, String>> text2 = env.fromCollection(data2)
//对两份数据集执行leftOuterJoin操作
text1.leftOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Intege
@Override
public Tuple3<Integer, String, String> join(Tuple2<Intege
throws Exception {
if(second==null){
return new Tuple3<Integer, String, String>(first.f
}else{
return new Tuple3<Integer, String, String>(first.f
}
}).print();
System.out.println("==============================================");
//对两份数据集执行rightOuterJoin操作
text1.rightOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Intege
@Override
public Tuple3<Integer, String, String> join(Tuple2<Intege
throws Exception {
if(first==null){
return new Tuple3<Integer, String, String>(second
}else{
return new Tuple3<Integer, String, String>(first.f
}
}
}).print();
System.out.println("==============================================");
//对两份数据集执行rightOuterJoin操作
text1.fullOuterJoin(text2)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Intege
@Override
public Tuple3<Integer, String, String> join(Tuple2<Intege
throws Exception {
if(first==null){
return new Tuple3<Integer, String, String>(second
}else if(second==null){
return new Tuple3<Integer, String, String>(first.f
}else{
return new Tuple3<Integer, String, String>(first.f
}
}
}).print();
}
}

cross

1
cross:获取两个数据集的笛卡尔积
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.imooc.scala.batch.transformation
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* cross:获取两个数据集的笛卡尔积
* Created by xuwei
*/
object BatchCrossScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//初始化第一份数据
val text1 = env.fromCollection(Array(1, 2))
//初始化第二份数据
val text2 = env.fromCollection(Array("a", "b"))
//执行cross操作
text1.cross(text2).print()
}
}

image-20230410150858841

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.imooc.java.batch.transformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import java.util.Arrays;
/**
* cross:获取两个数据集的笛卡尔积
* Created by xuwei
*/
public class BatchCrossJava {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironme
//初始化第一份数据
DataSource<Integer> text1 = env.fromCollection(Arrays.asList(1, 2));
//初始化第二份数据
DataSource<String> text2 = env.fromCollection(Arrays.asList("a", "b")
//执行cross操作
text1.cross(text2).print();
}
}
1
2
3
union:返回两个数据集的总和,数据类型需要一致
和DataStreamAPI中的union操作功能一样
first-n:获取集合中的前N个元素

first-n

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.imooc.scala.batch.transformation
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable.ListBuffer
/**
* first-n:获取集合中的前N个元素
* Created by xuwei
*/
object BatchFirstNScala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = ListBuffer[Tuple2[Int,String]]()
data.append((2,"zs"))
data.append((4,"ls"))
data.append((3,"ww"))
data.append((1,"aw"))
data.append((1,"xw"))
data.append((1,"mw"))
import org.apache.flink.api.scala._
//初始化数据
val text = env.fromCollection(data)
//获取前3条数据,按照数据插入的顺序
text.first(3).print()
println("==================================")
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print()
println("==================================")
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
//分组排序取TopN
text.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.imooc.java.batch.transformation;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
/**
* first-n:获取集合中的前N个元素
* Created by xuwei
*/
public class BatchFirstNJava {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironme
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<Integer,String>(2,"zs"));
data.add(new Tuple2<Integer,String>(4,"ls"));
data.add(new Tuple2<Integer,String>(3,"ww"));
data.add(new Tuple2<Integer,String>(1,"aw"));
data.add(new Tuple2<Integer,String>(1,"xw"));
data.add(new Tuple2<Integer,String>(1,"mw"));
//初始化数据
DataSource<Tuple2<Integer, String>> text = env.fromCollection(data);
//获取前3条数据,按照数据插入的顺序
text.first(3).print();
System.out.println("====================================");
//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print();
System.out.println("====================================");
//根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
//分组排序取TopN
text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
}
}

DataSet API之DataSink

1
2
3
4
5
6
7
8
Flink针对DataSet提供了一些已经实现好的数据目的地
其中最常见的是向HDFS中写入数据

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法

还有一个是print:打印每个元素的toString()方法的值
这个print是测试的时候使用的。

本文标题:大数据开发工程师-第十六周 Flink极速上手篇-Flink核心API之DataSetAPI-4

文章作者:TTYONG

发布时间:2023年04月08日 - 23:04

最后更新:2023年04月20日 - 14:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AD%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E6%A0%B8%E5%BF%83API%E4%B9%8BDataSetAPI(1.12%E4%B8%AD%E5%B7%B2%E7%BB%8F%E6%B7%98%E6%B1%B0)-4.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%