大数据开发工程师-第十六周 Flink极速上手篇-Flink核心API之DataStreamAPI-3


第十六周 Flink极速上手篇-Flink核心API之DataStreamAPI-3

Flink核心API

image-20230409172217160

1
Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。
1
2
3
4
5
6
7
8
9
10
低级API(Stateful Stream Processing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在一些复杂事件处理逻辑上。

核心API(DataStream/DataSet API):主要提供了针对流数据和批数据的处理,是对低级API进行了一些封装,提供了filter、sum、max、min等高级函数,简单易用,所以这些API在工作中应用还是比较广泛的。

Table API:一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,然后再使用类似于filter, join,或者select这种操作。最后还可以将一个Table对象转成
DataSet或DataStream。

SQL:Flink的SQL底层是基于Apache Calcite,Apache Calcite实现了标准的SQL,使用起来比其他API更加灵活,因为可以直接使用SQL语句。Table API和SQL可以很容易地结合在一块使用,因为它们都返回Table对象。

针对这些API我们主要学习下面这些

image-20230409172651151

DataStream API

1
2
3
4
5
6
DataStream API主要分为3块:DataSource、Transformation、DataSink。

DataSource是程序的输入数据源。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap和filter等操作。

DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

DataStream API之DataSoure

1
2
3
DataSource是程序的输入数据源,Flink提供了大量内置的DataSource,也支持自定义DataSource,不过目前Flink提供的这些已经足够我们正常使用了。
Flink提供的内置输入数据源:包括基于socket、基于Collection
还有就是Flink还提供了一批Connectors,可以实现读取第三方数据源,

image-20230409173429387

1
2
3
4
5
6
7
8
Flink内置:表示Flink中默认自带的。

Apache Bahir:表示需要添加这个依赖包之后才能使用的。

针对source的这些Connector,我们在实际工作中最常用的就是Kafka
当程序出现错误的时候,Flink的容错机制能恢复并继续运行程序,这种错误包括机器故障、网络故障、程序故障等

针对Flink提供的常用数据源接口,如果程序开启了checkpoint快照机制,Flink可以提供这些容错性保证

image-20230409173548905

1
2
3
4
5
针对这些常用的DataSouce,基于socket的我们之前已经用过了,下面我们来看一下基于Collection集合的。

针对Kafka的这个我们在后面会详细分析,在这里先不讲。

由于我们后面还会学到批处理的功能,所以在项目里面创建几个包,把流处理和批处理的代码分开,后期看起来比较清晰。

image-20230409174935218

1
接下来在 com.imooc.scala.stream 里面创建一个包:source,将代码放到source包里面
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.imooc.scala.stream.source
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 基于collection的source的使用
* 注意:这个source的主要应用场景是模拟测试代码流程的时候使用
* Created by xuwei
*/
object StreamCollectionSourceScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//使用collection集合生成DataStream
val text = env.fromCollection(Array(1,2,3,4,5))
text.print().setParallelism(1)
env.execute("StreamCollectionSource")
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.imooc.java.stream.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* 基于collection的source的使用
* Created by xuwei
*/
public class StreamCollectionSourceJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用collection集合生成DataStream
DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2,3,4));
text.print().setParallelism(1);
env.execute("StreamCollectionSourceJava");
}
}

自定义DataSource(作业)

1
2
3
4
5
6
7
8
9
10
11
12
题目描述:
在Flink流计算中开发自定义Source,实现循环产生从1开始递增+1的数字,然后把这个结果数据直接打印到控制台即可。

效果:
通过自定义Source实现产生从1开始的递增数字

任务要求:
1:Flink中的自定义Source分为支持单并行度和多并行度,在这里使用支持单并行度的自定义Source
2:使用Scala代码开发
任务提示、思路分析:
1:查阅Flink相关资料,实现自定义Source的开发
2:考虑使用SourceFunction
多并行度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class IncreasingNumberParallelSource extends ParallelSourceFunction[Long] {
var isRunning = true
var counter = 0L

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
counter += 1
ctx.collect(counter)
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}
1
2
3
4
5
6
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new IncreasingNumberParallelSource)
stream.print()
env.execute()
单并行度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.flink.streaming.api.functions.source.SourceFunction

class IncreasingNumberSource extends SourceFunction[Long] {
var isRunning = true
var counter = 0L

override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
counter += 1
ctx.collect(counter)
Thread.sleep(1000)
}
}

override def cancel(): Unit = {
isRunning = false
}
}
1
2
3
4
5
6
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new IncreasingNumberSource)
stream.print()
env.execute()

DataStream API之Transformation

1
transformation是Flink程序的计算算子,负责对数据进行处理,Flink提供了大量的算子,其实Flink中的大部分算子的使用和spark中算子的使用是一样的,下面我们来看一下:
1
2
3
4
5
6
7
算子 				解释
map 输入一个元素进行处理,返回一个元素
flatMap 输入一个元素进行处理,可以返回多个元素
filter 对数据进行过滤,符合条件的数据会被留下
keyBy 根据key分组,相同key的数据会进入同一个分区
reduce 对当前元素和上一次的结果进行聚合操作
aggregations sum(),min(),max()等
1
2
3
这里面的算子的用法其实和spark中对应算子的用法是一致的,这里面的map、flatmap、keyBy、reduce、sum这些算子我们都用过了。
所以这里面的算子就不再单独演示了。
往下面看。
1
2
3
4
5
6
7
8
算子 				解	释
union 合并多个流,多个流的数据类型必须一致
connect 只能连接两个流,两个流的数据类型可以不同
split 根据规则把一个数据流切分为多个流
shuffle 随机分区
rebalance 对数据集进行再平衡,重分区,消除数据倾斜
rescale 重分区
partitionCustom 自定义分区
1
2
3
4
这里面的算子我们需要分析一下。
union:表示合并多个流,但是多个流的数据类型必须一致
多个流join之后,就变成了一个流
应用场景:多种数据源的数据类型一致,数据处理规则也一致
union
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.imooc.scala.stream.transformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 合并多个流,多个流的数据类型必须一致
* 应用场景:多种数据源的数据类型一致,数据处理规则也一致
* Created by xuwei
*/
object StreamUnionScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//第1份数据流
val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
//第2份数据流
val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
//合并流
val unionStream = text1.union(text2)
//打印流中的数据
unionStream.print().setParallelism(1)
env.execute("StreamUnionScala")
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.imooc.java.stream.transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* 合并多个流,多个流的数据类型必须一致
* 应用场景:多种数据源的数据类型一致,数据处理规则也一致
* Created by xuwei
*/
public class StreamUnionJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//第1份数据流
DataStreamSource<Integer> text1 = env.fromCollection(Arrays.asList(1, 2, 3, 4));
//第2份数据流
DataStreamSource<Integer> text2 = env.fromCollection(Arrays.asList(6, 7, 8, 9));
//合并流
DataStream<Integer> unionStream = text1.union(text2);
//打印流中的数据
unionStream.print().setParallelism(1);
env.execute("StreamUnionJava");
}
}
connect
1
2
3
4
5
connect:只能连接两个流,两个流的数据类型可以不同

两个流被connect之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

connect方法会返回connectedStream,在connectedStream中需要使用CoMap、CoFlatMap这种函数,类似于map和flatmap
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.imooc.java.stream.transformation
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 只能连接两个流,两个流的数据类型可以不同
* 应用:可以将两种不同格式的数据统一成一种格式
* Created by xuwei
*/
object StreamConnectScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//第1份数据流
val text1 = env.fromElements("user:tom,age:18")
//第2份数据流
val text2 = env.fromElements("user:jack_age:20")
//连接两个流
val connectStream = text1.connect(text2)
connectStream.map(new CoMapFunction[String,String,String] {
//处理第1份数据流中的数据
override def map1(value: String): String = {
value.replace(",","-")
}
//处理第2份数据流中的数据
override def map2(value: String): String = {
value.replace("_","-")
}
}).print().setParallelism(1)
env.execute("StreamConnectScala")
}
}

image-20230409182145286

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.imooc.java.stream.transformation;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;
/**
* 只能连接两个流,两个流的数据类型可以不同
* Created by xuwei
*/
public class StreamConnectJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//第1份数据流
DataStreamSource<String> text1 = env.fromElements("user:tom,age:18");
//第2份数据流
DataStreamSource<String> text2 = env.fromElements("user:jack_age:20");
//连接两个流
ConnectedStreams<String, String> connectStream = text1.connect(text2);
connectStream.map(new CoMapFunction<String, String, String>() {
//处理第1份数据流中的数据
@Override
public String map1(String value) throws Exception {
return value.replace(",","-");
}
//处理第2份数据流中的数据
@Override
public String map2(String value) throws Exception {
return value.replace("_","-");
}
}).print().setParallelism(1);
env.execute("StreamConnectJava");
}
}
split
1
2
3
split:根据规则把一个数据流切分为多个流

注意:split只能分一次流,切分出来的流不能继续分流
1
2
split需要和select配合使用,选择切分后的流
应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.imooc.java.stream.transformation
import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 根据规则把一个数据流切分为多个流
* 注意:split只能分一次流,切分出来的流不能继续分流
* split需要和select配合使用,选择切分后的流
* 应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
* Created by xuwei
*/
object StreamSplitScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
//按照数据的奇偶性对数据进行分流
val splitStream = text.split(new OutputSelector[Int] {
override def select(value: Int): lang.Iterable[String] = {
val list = new util.ArrayList[String]()
if(value % 2 == 0){
list.add("even")//偶数
}else{
list.add("odd")//奇数
}
list
}
})

//选择流
val evenStream = splitStream.select("even")
evenStream.print().setParallelism(1)

//二次切分流会报错
//Consecutive multiple splits are not supported. Splits are deprecated. Pl
/*val lowHighStream = evenStream.split(new OutputSelector[Int] {
override def select(value: Int): lang.Iterable[String] = {
val list = new util.ArrayList[String]()
if(value <= 5){
list.add("low");
}else{
list.add("high")
}
list
}
})
val lowStream = lowHighStream.select("low")
lowStream.print().setParallelism(1)*/
env.execute("StreamSplitScala")
}
}
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.imooc.java.stream.transformation;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Arrays;
/**
* 根据规则把一个数据流切分为多个流
* 注意:split只能分一次流,切分出来的流不能继续分流
* split需要和select配合使用,选择切分后的流
* 应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
* Created by xuwei
*/
public class StreamSplitJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
//按照数据的奇偶性对数据进行分流
SplitStream<Integer> splitStream = text.split(new OutputSelector<Integer>(){
@Override
public Iterable<String> select(Integer value) {
ArrayList<String> list = new ArrayList<>();
if (value % 2 == 0) {
list.add("even");//偶数
} else {
list.add("odd");//奇数
}
return list;
}
});
//选择流
DataStream<Integer> evenStream = splitStream.select("even");
evenStream.print().setParallelism(1);
env.execute("StreamSplitJava");
}
}
1
目前split切分的流无法进行二次切分,并且split方法已经标记为过时了,官方不推荐使用,现在官方推荐使用side output的方式实现。
side output
1
下面我来看一下使用side output如何实现流的多次切分
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.imooc.java.stream.transformation
import java.{lang, util}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnviron
import org.apache.flink.util.Collector
/**
* 使用sideoutput切分流
* Created by xuwei
*/
object StreamSideOutputScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
//按照数据的奇偶性对数据进行分流
//首先定义两个sideoutput来准备保存切分出来的数据
val outputTag1 = new OutputTag[Int]("even"){}//保存偶数
val outputTag2 = new OutputTag[Int]("odd"){}//保存奇数
//注意:process属于Flink中的低级api
val outputStream = text.process(new ProcessFunction[Int,Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,out:Collector[Int]):
if(value % 2 == 0 ){
ctx.output(outputTag1,value)
}else{
ctx.output(outputTag2,value)
}
}
})
//获取偶数数据流
val evenStream = outputStream.getSideOutput(outputTag1)
//获取奇数数据流
val oddStream = outputStream.getSideOutput(outputTag2)
//evenStream.print().setParallelism(1)
//对evenStream流进行二次切分
val outputTag11 = new OutputTag[Int]("low"){}//保存小于等五5的数字
val outputTag12 = new OutputTag[Int]("high"){}//保存大于5的数字
val subOutputStream = evenStream.process(new ProcessFunction[Int,Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,Collector[Int]):
if(value<=5){
ctx.output(outputTag11,value)
}else{
ctx.output(outputTag12,value)
}
}
})
//获取小于等于5的数据流
val lowStream = subOutputStream.getSideOutput(outputTag11)
//获取大于5的数据流
val highStream = subOutputStream.getSideOutput(outputTag12)
lowStream.print().setParallelism(1)
env.execute("StreamSideOutputScala")
}
}

image-20230409231015815

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.imooc.java.stream.transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Arrays;
/**
* 使用sideoutput切分流
* Created by xuwei
*/
public class StreamSideoutputJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecuti
DataStreamSource<Integer> text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
//按照数据的奇偶性对数据进行分流
//首先定义两个sideoutput来准备保存切分出来的数据
OutputTag<Integer> outputTag1 = new OutputTag<Integer>("even") {};
OutputTag<Integer> outputTag2 = new OutputTag<Integer>("odd") {};
SingleOutputStreamOperator<Integer> outputStream = text.process(new ProcessFunction<Integer,Integer>(){
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
if (value % 2 == 0) {
ctx.output(outputTag1, value);
} else {
ctx.output(outputTag2, value);
}
}
});
//获取偶数数据流
DataStream<Integer> evenStream = outputStream.getSideOutput(outputTag1);
//获取奇数数据流
DataStream<Integer> oddStream = outputStream.getSideOutput(outputTag2);
//对evenStream流进行二次切分
OutputTag<Integer> outputTag11 = new OutputTag<Integer>("low") {};
OutputTag<Integer> outputTag12 = new OutputTag<Integer>("high") {};
SingleOutputStreamOperator<Integer> subOutputStream = evenStream.process(new ProcessFunction<Integer,Integer>(){
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
if (value <= 5) {
ctx.output(outputTag11, value);
} else {
ctx.output(outputTag12, value);
}
}
});
//获取小于等于5的数据流
DataStream<Integer> lowStream = subOutputStream.getSideOutput(outputTag11);
//获取大于5的数据流
DataStream<Integer> highStream = subOutputStream.getSideOutput(outputTag12);
lowStream.print().setParallelism(1);
env.execute("StreamSideoutputJava");
}
}
总结
1
2
3
其实想要实现多级流切分,使用filter算子也是可以实现的,给大家留一个作业,大家可以下去实验一下。
最后针对这几个算子总结一下:
首先是union和connect的区别,如图所示:

image-20230409231907397

1
2
3
4
5
6
7
union可以连接多个流,最后汇总成一个流,流里面的数据使用相同的计算规则
connect值可以连接2个流,最后汇总成一个流,但是流里面的两份数据相互还是独立的,每一份数据使用一个计算规则


然后是流切分
如果是只需要切分一次的话使用split或者side output都可以
如果想要切分多次,就不能使用split了,需要使用side output

image-20230409231939157

分区相关算子
random(shuffle)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来看一下这几个和分区相关的算子
算子 解释
random 随机分区
rebalance 对数据集进行再平衡,重分区,消除数据倾斜|
rescale 重分区
custom partition 自定义分区

random:随机分区,它表示将上游数据随机分发到下游算子实例的每个分区中,在代码层面体现是调用shuffle()函数

查看源码,shuffle底层对应的是ShufflePartitioner这个类
这个类里面有一个selectChannel函数,这个函数会计算数据将会被发送给哪个分区,里面使用的是random.nextInt,所以说是随机的。

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return random.nextInt(numberOfChannels);
}
rebalance
1
2
3
4
rebalance:重新平衡分区(循环分区),我觉得叫循环分区更好理解,它表示对数据集进行再平衡,消除数据倾斜,为每个分区创建相同的负载,其实就是通过循环的方式给下游算子实例的每个分区分配数据,在代码层面体现是调用rebalance()函数

查看源码,rebalance底层对应的是RebalancePartitioner这个类
这个类里面有一个setup和selectChannel函数,setup函数会根据分区数初始化一个随机值nextChannelToSendTo,然后selectChannel函数会使用nextChannelToSendTo加1和分区数取模,把计算的值再赋给nextChannelToSendTo,后面以此类推,其实就可以实现向下游算子实例的多个分区循环发送数据了,这样每个分区获取到的数据基本一致。
1
2
3
4
5
6
7
8
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels)
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
rescale
1
2
3
rescale:重分区
查看源码,rescale底层对应的是RescalePartitioner这个类
这个类里面有一个selectChannel函数,这里面的numberOfChannels是分区数量,其实也可以认为是我们所说的算子的并行度,因为一个分区是由一个线程负责处理的,它们两个是一一对应的。
1
2
3
4
5
6
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
}
return nextChannelToSendTo;
}
1
详细的解释在这个类的注释中也是有的
1
2
3
4
5
6
7
8
9
* The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downs
* For example, if the upstream operation has parallelism 2 and the downstream
* has parallelism 4, then one upstream operation would distribute elements to
* downstream operations while the other upstream operation would distribute to
* two downstream operations. If, on the other hand, the downstream operation h
* 2 while the upstream operation has parallelism 4 then two upstream operation
* distribute to one downstream operation while the other two upstream operatio
* distribute to the other downstream operations.
1
2
如果上游操作有2个并发,而下游操作有4个并发,那么上游的1个并发结果循环分配给下游的2个并发操作,上游的另外1个并发结果循环分配给下游的另外2个并发操作。
另一种情况,如果上游有4个并发操作,而下游有2个并发操作,那么上游的其中2个并发操作的结果会分配给下游的一个并发操作,而上游的另外2个并发操作的结果则分配给下游的另外1个并发操作。
1
注意:rescale与rebalance的区别是rebalance会产生全量重分区,而rescale不会。
broadcast
1
2
3
4
broadcast:广播分区,将上游算子实例中的数据输出到下游算子实例的每个分区中,适合用于大数据集Join小数据集的场景,可以提高性能。

查看源码,broadcast底层对应的是BroadcastPartitioner这个类
看这个类中的selectChannel函数代码的注释,提示广播分区不支持选择Channel,因为会输出数据到下游的每个Channel中,就是发送到下游算子实例的每个分区中
1
2
3
4
5
6
7
8
/**
* Note: Broadcast mode could be handled directly for all the output channels
* in record writer, so it is no need to select channels via this method.
*/
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not sup
}
custom partition
1
2
3
4
custom partition:自定义分区,可以按照自定义规则实现
自定义分区需要实现Partitioner接口

这是针对这几种分区的解释,下面来通过这个图总结一下,加深理解
总结

image-20230409234549859

image-20230409234806351

image-20230409235046659

1
最后使用代码演示一下它们具体的用法
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.imooc.scala.stream.transformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnviro
import org.apache.flink.api.scala._
/**
* 分区规则的使用
* Created by xuwei
*/
object StreamPartitionOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//注意:默认情况下Fink任务中算子的并行度会读取当前机器的CPU个数
//但fromCollection的并行度为1,由源码可知
val text = env.fromCollection(Array(1,2,3,4,5,6,7,8,9,10))

//注意:在这里建议将这个隐式转换代码放到类上面
//因为默认它只在main函数生效,针对下面提取的shuffleOp是无效的,否则也需要在shuffleOp添加这行代码
//import org.apache.flink.api.scala._

//使用shuffle分区规则
//shuffleOp(text)

//使用rebalance分区规则
//rebalanceOp(text)

//使用rescale分区规则
//rescaleOp(text)

//使用broadcast分区规则,此代码一共会打印40条数据,因为print的并行度为4
//broadcastOp(text)

//自定义分区规则:根据数据的奇偶性进行分区
//注意:此时虽然print算子的并行度是4,但是自定义的分区规则只会把数据分发给2个并行度,所以有两个不干活
//custormPartitionOp(text)
env.execute("StreamPartitionOpScala");
}

private def custormPartitionOp(text: DataStream[Int]) = {
text.map(num => num)
.setParallelism(2) //设置map算子的并行度为2
//.partitionCustom(new MyPartitionerScala,0)//这种写法已经过期
.partitionCustom(new MyPartitionerScala, num => num)//官方建议使用keyselector
.print()
.setParallelism(4) //设置print算子的并行度为4
}

private def broadcastOp(text: DataStream[Int]) = {
text.map(num => num)
.setParallelism(2) //设置map算子的并行度为2
.broadcast
.print()
.setParallelism(4) //设置print算子的并行度为4
}

private def rescaleOp(text: DataStream[Int]) = {
text.map(num => num)
.setParallelism(2) //设置map算子的并行度为2
.rescale
.print()
.setParallelism(4) //设置print算子的并行度为4
}

private def rebalanceOp(text: DataStream[Int]) = {
text.map(num => num)
.setParallelism(2) //设置map算子的并行度为2
.rebalance
.print()
.setParallelism(4) //设置print算子的并行度为4
}

private def shuffleOp(text: DataStream[Int]) = {
//由于fromCollection已经设置了并行度为1,所以需要再接一个算子之后才能修改并行度
text.map(num => num)
.setParallelism(2) //设置map算子的并行度为2
.shuffle
.print()
.setParallelism(4) //设置print算子的并行度为4
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.imooc.scala.stream.transformation
import org.apache.flink.api.common.functions.Partitioner
/**
* 自定义分区规则:按照数字的奇偶性进行分区
* Created by xuwei
*/
class MyPartitionerScala extends Partitioner[Int]{
override def partition(key: Int, numPartitions: Int): Int = {
println("分区总数:"+numPartitions)
if(key % 2 == 0){//偶数分到0号分区
0
}else{//奇数分到1号分区
1
}
}
}
1
custom partition

image-20230410002655126

1
broadcast 此时每个数据都有四次输出

image-20230410001436246

1
rescale  四个分区都有输出

image-20230410001313667

1
rebalance 四个分区都有数据输出

image-20230410000959762

1
shuffle  不一定每个分区都有数据输出

image-20230410001045259

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.imooc.java.stream.transformation;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* 分区规则的使用
* Created by xuwei
*/
public class StreamPartitionOpJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecuti
DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2
//使用shuffle分区规则
//shuffleOp(text);
//使用rebalance分区规则
//rebalanceOp(text);
//使用rescale分区规则
//rescaleOp(text);
//使用broadcast分区规则
//broadcastOp(text);
//自定义分区规则
//custormPartitionOp(text);
env.execute("StreamPartitionOpJava");
}
private static void custormPartitionOp(DataStreamSource<Integer> text) {
text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}).setParallelism(2)
.partitionCustom(new MyPartitionerJava(), new KeySelector<Inte
@Override
public Integer getKey(Integer integer) throws Exception {
return integer;
}
})
.print()
.setParallelism(4);
}
private static void broadcastOp(DataStreamSource<Integer> text) {
text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}).setParallelism(2)
.broadcast()
.print()
.setParallelism(4);
}
private static void rescaleOp(DataStreamSource<Integer> text) {
text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}).setParallelism(2)
.rescale()
.print()
.setParallelism(4);
}
private static void rebalanceOp(DataStreamSource<Integer> text) {
text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}).setParallelism(2)
.rebalance()
.print()
.setParallelism(4);
}
private static void shuffleOp(DataStreamSource<Integer> text) {
text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer;
}
}).setParallelism(2)
.shuffle()
.print()
.setParallelism(4);
}
}

DataStream API之DataSink

1
2
3
4
5
6
DataSink是 输出组件,负责把计算好的数据输出到其它存储介质中
Flink支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。

还有就是print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息

Flink提供了一批Connectors,可以实现输出到第三方目的地
1
2
3
4
5
6
7
8
9
Flink内置		 Apache Bahir
Kafka ActiveMQ
Cassandra Flume
Kinesis Streams Redis
Elasticsearch Akka
Hadoop FileSysterm
RabbitMQ
NiFi
JDBC
1
2
3
4
5
6
7
针对sink的这些connector,我们在实际工作中最常用的是kafka、redis
针对Flink提供的常用sink组件,可以提供这些容错性保证

DataSink 容错保证 备注
Redis at least once
Kafka at least once/exactlyonce Kafka0.9和0.10提供at least once,Kafka0.11及以上提供
exactly once

image-20230410120538984

1
2
针对kafka这个sink组件的使用,我们在后面会统一分析,现在我们来使用一下redis这个sink组件
需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
redis
1
先到Flink官网,文档,connector,datasteam,redis,添加对应的依赖(一般不是正确的依赖,把名字复制到maven官网,查找)
scala
1
注意:redis sink是在Bahir这个依赖包中,所以在pom.xml中需要添加对应的依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.imooc.scala.stream.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoo
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand
/**
* 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
* Created by xuwei
*/
object StreamRedisSinkScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._

//组装数据,这里组装的是tuple2类型
//第一个元素是指list队列的key名称
//第二个元素是指需要向list队列中添加的元素
val listData = text.map(word => ("l_words_scala", word))

//指定redisSink
val conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).build()
val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper)
listData.addSink(redisSink)
env.execute("StreamRedisSinkScala")
}


class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{
//指定具体的操作命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
//获取key
override def getKeyFromData(data: (String, String)): String = {
data._1
}
//获取value
override def getValueFromData(data: (String, String)): String = {
data._2
}
}
}
1
2
3
4
5
6
7
注意:针对List数据类型,我们在定义getCommandDescription方法的时候,使用new
RedisCommandDescription(RedisCommand.LPUSH);。

如果是Hash数据类型,在定义getCommandDescription方法的时候,需要使用new
RedisCommandDescription(RedisCommand.HSET,“hashKey”);,在构造函数中需要直接指定Hash数据类型的key的名称。

注意:执行代码之前,需要先开启socket和redis服务
1
2
3
4
5
6
通过socket传递单词
[root@bigdata04 ~]# nc -l 9001

最终到redis中查看结果
[root@bigdata04 redis-5.0.9]# redis-cli
127.0.0.1:6379> lrange l_words_scala 0 -1

image-20230410123339761

image-20230410123420552

1
最终到redis中查看结果

image-20230410123448085

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.imooc.java.stream.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoo
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandD
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
* Created by xuwei
*/
public class StreamRedisSinkJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);
//组装数据
SingleOutputStreamOperator<Tuple2<String, String>> listData = text.map(new MapFunction<String,String>{
@Override
public Tuple2<String, String> map(String word) throws Exception {
return new Tuple2<String, String>("l_words_java", word);
}
});
//指定redisSink
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).build();

RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper())
listData.addSink(redisSink);

env.execute("StreamRedisSinkJava");
}

public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
//指定具体的操作命令
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
//获取key
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//获取value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
}
自定义DataSink
1
2
3
4
5
6
7
8
9
10
11
12
13
题目描述:
在Flink流计算中开发自定义Sink,实现将数据保存到MySQL中,使用基于Socket的Source模拟产生数据

效果:
使用Flink将实时产生的数据保存到MySQL数据库中

任务要求:
1:避免频繁创建MySQL数据库连接
2:使用Scala代码开发

任务提示、思路分析:
1:查阅Flink相关资料,实现自定义Sink的开发
2:考虑使用RichSinkFunction
RichSinkFunction
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class MySQLSink extends RichSinkFunction[(Int, String)] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/database", "user", "password")
insertStmt = conn.prepareStatement("INSERT INTO myTable (id, name) VALUES (?, ?)")
updateStmt = conn.prepareStatement("UPDATE myTable SET name = ? WHERE id = ?")
}

override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
updateStmt.setString(1, value._2)
updateStmt.setInt(2, value._1)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setInt(1, value._1)
insertStmt.setString(2, value._2)
insertStmt.execute()
}
}

override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
1
2
3
4
5
6
7
8
9
10
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1))
})
stream.addSink(new MySQLSink)
env.execute()
JDBC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.flink.connector.jdbc.JdbcSink
import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1))
})

stream.addSink(
JdbcSink.sink(
"INSERT INTO myTable (id, name) VALUES (?, ?)",
(stmt, value) => {
stmt.setInt(1, value._1)
stmt.setString(2, value._2)
},
new JdbcExecutionOptions.Builder().withBatchSize(1000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/database")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("user")
.withPassword("password")
.build()
)
)

env.execute()

本文标题:大数据开发工程师-第十六周 Flink极速上手篇-Flink核心API之DataStreamAPI-3

文章作者:TTYONG

发布时间:2023年04月08日 - 23:04

最后更新:2023年05月30日 - 12:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AD%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E6%A0%B8%E5%BF%83API%E4%B9%8BDataStreamAPI-3.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%