大数据开发工程师-第十七周 Flink极速上手篇-Flink高级进阶之路-1


第十七周 Flink极速上手篇-Flink高级进阶之路-1

Window的概念和类型

1
2
3
4
5
6
7
8
9
大家好,前面我们学习了flink中的基本概念,集群部署以及核心API的使用,下面我们来学习一下flink中的高级特性的使用。首先,我们需要掌握中的window、time以及whatermark使用。然后我们需要掌握kafka-connector使用,这个是针对kafka一个专题。最后我们会学习一下Spark中的流式计算sparkStreaming,之前在学习spark的时候我们没有涉及这块,在这儿我们和flink一块来学习,可以加深理解,因为它们都是流式计算引擎。

下面呢,我们首先进入第一块flink中的window和time。flink认为批处理是流处理的一个特例,所以flink底层引擎是一个流式引擎,这上面呢实现了流处理和批处理。而window呢,就是从流处理到批处理的一个桥梁。通常来讲啊,这个window啊,是一种可以把无界数据切割为有界数据块的手段,例如对流动的所有元素进行计数是不可能的,因为通常流是无限的。或者呢,可以称之为是无界了。所以说流上的聚合需要由window来划分范围,比如计算过去五分钟或者最后100个元素的和。

window可以是以时间驱动的time window,例如每30秒,或者是以数据驱动的count window,例如每100个元素。DataStream API提供了基于time和count的window。同时,由于某些特殊的需要,dataStreamAPI也提供了定制化的window操作,供用户自定义window。

这个window呀,根据类型可以分为这两种。第一种是滚动窗口,它呢表示窗口内的数据没有重叠,第二种呢是滑动窗口,它呢表示窗口内的数据有重叠。

那下面我们来看个图分析一下,首先看这个滚动窗口,这个S轴呢是一个时间轴,你看这个是一个窗口的大小,这是WINDOW1 window2 window3,注意每个窗口内的数据是没有重叠的,这个就是滚动窗口。

image-20230420170417289

1
下面来看这个滑动窗口,这个S轴呢,还是一个时间轴,你看这个是一个window的大小。这个表示是一个window的滑动间隔,这是WINDOW1这个红色的,它这个窗口从这到这儿,下面这个呢,WINDOW2,注意这个窗口它是从这儿到这儿,这个蓝色的看到没有,它里面呢,包含了WINDOW1里面的一部分数据。那你看WINDOW3 window3里面它包含了WINDOW2里面的一部分数据,所以说这个滑动窗口,它们每个窗口之间呀,会有数据重叠,这个就这两种窗口它的一个区别

image-20230420170515739

1
下面我针对这个窗口的类型做了一个汇总。你看这是window window下面有time window有count window还有自定义window,那这些window再往下面你看它呢,可以实现滚动窗口或者滑动窗口,对吧?不管你是基于time的,还是基于count的,还是自定义的,你们都可以实现滚动窗口或者是滑动窗口。

image-20230420170745169

TimeWindow的使用

scala

1
2
3
4
5
6
7
8
下面呢,我们来看一下这些window的具体应用,首先来看第一个time window。time window呢是根据时间对数据流切分窗口,time window可以支持滚动窗口和滑动窗口。

其中它有这么两种用法,来看一下time window。
timeWindow(Time.seconds(10))
注意,首先这个。他呢是表示。滚动窗口的窗口大小为十秒。对每十秒内的数据,进行聚合计算。这个呢,其实就是设置一个滚动窗口。

timeWindow(Time.seconds(10),Time.seconds(5))
那下面这个呢,对应的它设置的就是一个滑动窗口,因为它除了有一个窗口大小,它还滑动一个间隔。表示滑动窗口的窗口大小为十秒,滑动间隔为五秒,就是每隔五秒计算前十秒内的数据,所以说是两种用法,一种是滚动,一种是滑动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.imooc.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
* TimeWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by xuwei
*/
object TimeWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._

//TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
/*text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//窗口大小
.timeWindow(Time.seconds(10))
.sum(1)
.print()*/

//TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1)
.print()
env.execute("TimeWindowOpScala")
}
}
1
timeWindow滚动窗口

image-20230420172901278

image-20230420172914398

1
timeWindow滑动窗口,黑色第一次输入,蓝色第二次输入

image-20230420173241489

1
第三次打印,蓝色

image-20230420173625064

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.imooc.java.window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* TimeWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by xuwei
*/
public class TimeWindowOpJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);

//TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
/*text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.timeWindow(Time.seconds(10))
.sum(1)
.print();*/

//TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1)
.print();

env.execute("TimeWindowOpJava");
}
}

CountWindow的使用

1
2
3
4
5
6
下面呢,我们来看一下count的Window的使用,count Window是根据元素个数对数据流切分窗口。count window也可以支持滚动窗口和滑动窗口。
countWindow(5)表示滚动窗口的大小,是五个元素。也就是当窗口中填满五个元素的时候,就会对窗口进行计算
countWindow(5,1)
表示滑动窗口的窗口大小是五个元素,滑动的间隔为一个元素,也就是说每新增一个元素就会对前面五个元素计算一次

那我们再验证一下。有有有。写四个啊。看到没有,这个又要直行了。对吧,你后面再加哈,注意。此时呢,它就不会再执行了,因为你是一个滚动窗口啊,最终呢,你再满足有五个元素之后,它才会重新执行,这是一个滚动窗口。把这个听一下。把这个注意事项啊,我们给它加进来,这是一个解释啊。由于我们在这里使用了可以。会相对数据分组。如果某个分组对应的数据窗口,数据窗口内达到了五个元素,这个窗口才会被主发执行,如果你不使用KPI的话,他就不会在这儿做区分了,所以他接收到所有的数据,在这儿会统一计算。不过那个时候你就需要使用这个count window or这个咱们后面再分析啊,接着我们先使用这个K方式,后面呢直接使用这个count window,好,这是一个滚动窗口,下面呢,我们来实现一个滑动窗口。它的豌豆之滑动窗口。每隔一个元素计算一次前五个元素。map。空格切一下。小点一。零零,它的window,注意第一个参数是窗口大小,第二个是滑动间隔。嗯。窗口大小。第二个参数。滑动间隔。一。BA。好,那接着要把上面这个的读调,嗯。把这个socket呢,再给它打开。嗯。好,那我们到这儿来数数句,hello you。注意它直行了,为什么呀,因为它的滑动间隔是一,只要间隔一个元素,它就会执行,它呢会往前推找五个元素,但是它前面并没有五个元素,就只有这一个,所以说最终的结果呢,就是这样好。那下面呢,我继续往里面添加元素。hello,你。看那个效果,看到没有,hello就两次了,me是一次对吧,hello已经变成两次了,那下面我们还按照刚才这个逻辑。hello,加三次,你看加三次,它其实最终呢,输出了三条如玉,这次是三,这次是四,这次是五。没问题吧,因为你新增一条数据,它就会往前推五条数据去统计。嗯。看到没有2345。这也是可以的啊,然后再加个什么,hello。还是50。you。为什么一直是五次呢?因为它只会往前面统计五个元素啊。好,这就滑动窗口,下面我们来使用Java代码来实现一下。放着window。op加。嗯。嗯嗯。先获取一个环境。get。嗯。嗯。嗯。看window。直滚动方口。每隔五个元素计算一次。前五个元素。加个小碟red map,你有一个red map。注意我们在这呢,还把这个map和map它这个逻辑整合一块,说输入是词频输出是。in。嗯。来。慢点,split。不了。我。在这个。嗯。WORD1。对吧,这样看一下后面一个a。零。嗯。放了window。五。嗯嗯。some。嗯。这个是窗口大小。好,接下来讲第二个把这个注释呢,从这复制一下吧。所以这是每格啊。嗯。好,这个前面啊,其实都一样啊,只有一个地方不一样,对吧。就是把这个放到温度这块,给它改一下就行。和两个参数,嗯。第一个参数窗口大小,第二个参数。滑动间隔。嗯嗯。因为一点。嗯。顺便抛个异常。好,这就可以了,在这我们可以助调一个。验证一下这个滑动窗口。赶紧回来把这个打开。OK。好。没问题吧,没问题啊。这就是Java代码,实现这个count window。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.imooc.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* CountWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by xuwei
*/
object CountWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._
/**
* 注意:由于我们在这里使用keyBy,会先对数据分组
* 如果某个分组对应的数据窗口内达到了5个元素,这个窗口才会被触发执行
*/
//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
/*text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//指定窗口大小
.countWindow(5)
.sum(1)
.print()*/


//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5,1)
.sum(1)
.print()
env.execute("CountWindowOpScala")
}
}
1
滚动窗口

image-20230420175629114

image-20230420175643559

1
所以我在这再输三个,看到没有,到这儿才刚开始执行了一次,他把这个hello打印出来五个。那这个you和me为什么没有打印呢?注意了,所以啊,我们在这啊执行了keyby会对这个数据进行分组,如果某个分组对应的数据窗口内达到了五个元素,这个窗口才会被处罚执行,所以说这个时候相当于是hello对应的那个窗口,它里面够五个元素了,它才会执行。

image-20230420175819357

image-20230420175806865

1
看一下count滑动窗口执行结果

image-20230420180125565

image-20230420180113979

image-20230420180150478

image-20230420180220305

image-20230420180355733

image-20230420180527334

image-20230420180609855

image-20230420180630486

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.imooc.java.window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* CountWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by xuwei
*/
public class CountWindowOpJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);

//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
/*text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.countWindow(5)
.sum(1)
.print();*/

//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5,1)
.sum(1)
.print();

env.execute("CountWindowOpJava");
}
}

自定义Window的使用

1
下面呢,我们来看一下自定义window。其实呢,window还可以再细分一下。可以把它分为呢,一种是基于Key的window。一种是不基于Key的window。其实就是说咱们在使用window之前是否执行了key操作啊,咱们前面演示的都是这种基于Key的window。你看我们在做window之前,前面呢都做了Keyby对吧,那如果呢,需求中不需要根据Key进行分组,你在使用window的时候啊,我们需要对应的去使用那个timeWindowAll和countWindowAll。

image-20230420182825536

1
2
3
你使用KeyBy之后的话,它就只能调那个timeWindow,countWindow(这两个也可以用window实现),这个需要注意一下啊,那如果说是我们自定义的window。如何使用呢?对吧,针对这两种情况。来看一下。针对这个基于Key的window呀,我们需要使用这个window函数

那针对下面这种不基于Key的window呢,我们可以直接使用这个windowAll就可以了。其实呀,我们前面所说的那个timewindow和timewindowall(这两个也可以用windowAll实现)底层用的就是这个window和windowall,你可以这样理解timewindow是官方封装好的window。所以说呢,timewindow和countwindow呢,都是官方封装好了。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input: DataStream[T] = ...

// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)

// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.imooc.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
* 需求:自定义MyTimeWindow
* Created by xuwei
*/
object MyTimeWindowScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("bigdata04", 9001)

import org.apache.flink.api.scala._

//自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
//窗口大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))//注意这里和后面的基于eventtime计算有点不一样
.sum(1)
.print()
env.execute("MyTimeWindowScala")
}
}
1
这个呢,和咱们之前啊使用的什么timewindow那个效果是一样的。这样的话更加灵活一些,我们想怎么定义都可以啊。如果你不使用这个KeyBy的话,那下面你就可以使用windowAll是一样的效果

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.imooc.java.window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
* 需求:自定义MyTimeWindow
* Created by xuwei
*/
public class MyTimeWindowJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);

//自定义MyTimeWindow滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0)
//窗口大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
.print();

env.execute("MyTimeWindowJava");
}
}

Window中的增量聚合和全量聚合

增量聚合

1
2
3
4
5
6
下面呢,我们来看一下Window聚合。就是在进行Window聚合操作的时候呢,可以分为两种情况。一种呢是增量聚合,还有一种是全量聚合。

那下面我们首先来看一下这个增量聚合。增量聚合呢,它表示呀,窗口中每进入一条数据就进行一次计算,常见的一些增量聚合函数如下:
reduce() aggregate() sum() min() max()

那下面呢,我们来看一个增量聚合的案例啊,就是累加求和

image-20230420215009349

1
2
3
它的具体执行过程是这样的。第一次进来一条数据,则立刻进行累加,求和结果为八,第二次进来一条数据12,则立刻进行累加,求和结果为20。第三次进来一条数据七,则立刻进行累加求和,结果为27。第四次进来一条数据,则立刻进行累加求和,结果为37。这就是这个增量聚合它的一个执行流程。

那下面呢,我们来看一下reduce函数的一个使用,从这里面我们可以看出来,reduce是每次获取一条数据和上一次的执行结果求和。也就是来一条数据,立刻计算一次,这个就是增量聚合。

全量聚合

1
2
3
4
那下面呢,我们来看一下全量集合。全量集合呀,它就是等属于窗口的数据都到齐了,才开始进行聚合计算,可以实现对窗口内的数据进行排序等需求。常见的一些全量聚合函数为:
apply(windowFunction),还有这个process(processWindowFunction)
apply呢,它里面接触的是windowfunction,process里面接触是processwindowfunction
注意这个processwindowfunction比windowfunction提供了更多的上下文信息啊。那下面呢,我们来看一个全量聚合的一个案例,求最大值

image-20230420215701906

1
第四次进来调数据10,此时窗口触发,这时候才会对窗口内的数据进行排序,然后获取最大值。
全量聚合apply

image-20230420215834620

1
下面呢,我们来看一下这个apply函数的一个使用。从这你可以看出来,他接触的是一个iterable,可以认为是一个集合。他可以把这个窗口的数据啊,一次性全都传过来,当这个窗口触发的时候,才会真正执行这个代码。
全量聚合process

image-20230420220026246

1
下面呢是一个process。你看他接触的也是一个iterable,所以说呢,你在这里面就可以获取到这个窗口里面的所有数据了。
1
这个呢就是Windows中的全量聚合和增量聚合,后面呢我们就会用到这个apply,还有process它的一个使用,因为有时候我们需要对这个窗口内的所有数据去做一些全量的操作,这样的话就不能用这种增量聚合,而要用这种全量聚合。

Flink中的Time

1
针对流数据的time可以分为以下三种。第一个Event Time表示事件产生的时间,它通常由事件中的时间戳来描述。第二个ingestion time表示事件进入flink的时间。第三个processing time,它表示事件被处理时当前系统的时间,那这几种时间呀,我们通过这个图可以很清晰的看出来它们之间的关系。

image-20230420220641443

1
首先是even time,这个就是数据产生的时间。第二个是ingestion time表示呢,他进入flink时间,其实就是被那个source把它读取过来那个时间。第三个呢,是这个processing time,它其实呢,就是flink里面具体的算子,在处理的时候它的一个时间,那接下来我们来看一个案例。

image-20230420220902227

1
2
3
4
5
注意你看数据呢,是在十点的时候产生的。结果呢,在晚上八点的时候才被flink读取走。那flink真正在处理的时候呢?是8.02秒。

注意,如果说呀,我们想要统计每分钟内接口调用失败的错误日志个数。那这个时候使用哪个时间才有意义呢?因为数据有可能会出现延迟。如果使用那个数据进入flink的时间或者window处理的时间,其实是没有意义的。这个时候我们需要使用原始日中的时间才是有意义的,这个才是数据产生的时间,我们基于这个时间去统计才有意义。

那我们在flink流水中默认使用的是哪个时间呢?某种情况下,flink在流处理中使用的时间是这个processingtime。那如果说我们想要修改的话,怎么改呢?可以使用这个env去改env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)可以设置这个time或者是这个IngestionTime。好,这就是flink中的三种time。

Watermark的分析

背景

image-20230421100013693

1
实时计算中,数据时间比较敏感。有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。
1
2
3
4
5
6
前面提到了Time的概念,如果我们使用Processing Time,那么在Flink消费数据的时候,它完全不需要关心数据本身的时间,意思也就是说不需要关心数据到底是延迟数据还是乱序数据。因为Processing Time只是代表数据在Flink被处理时的时间,这个时间是顺序的。

但是如果你使用的是Event Time的话,那么你就不得不面临着这么个问题:事件乱序&事件延迟。

所以…
为了解决这个问题,Flink中引入了WaterMark机制,即水印的概念。

image-20230421100850143

1
2
3
4
5
6
然而在有些场景下,尤其是特别依赖于事件时间而不是处理时间,比如:
错误日志的时间戳,代表着发生的错误的具体时间,开发们只有知道了这个时间戳,才能去还原那个时间点系统到底发生了什么问题,或者根据那个时间戳去关联其他的事件,找出导致问题触发的罪魁祸首
设备传感器或者监控系统实时上传对应时间点的设备周围的监控情况,通过监控大屏可以实时查看,不错漏重要或者可疑的事件
比如我做过的充电桩实时报文分析,就必须依赖报文产生的时间,即事件时间

针对上面的问题(事件乱序 & 事件延迟),Flink引入了Watermark机制来解决。
1
2
3
4
5
统计8:00 ~ 9:00这个时间段打开淘宝App的用户数量,Flink这边可以开个窗口做聚合操作,但是由于网络的抖动或者应用采集数据发送延迟等问题,于是无法保证在窗口时间结束的那一刻窗口中是否已经收集好了在8:00 ~ 9:00中用户打开App的事件数据,但又不能无限期的等下去?

当基于事件时间的数据流进行窗口计算时,最为困难的一点也就是如何确定对应当前窗口的事件已经全部到达。然而实际上并不能百分百的准确判断,因此业界常用的方法就是基于已经收集的消息来估算是否还有消息未到达,这就是Watermark的思想。

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的Watermark。Watermark本质来说就是一个时间戳,代表着比这时间戳早的事件已经全部到达窗口,即假设不会再有比这时间戳还小的事件到达,这个假设是触发窗口计算的基础,只有Watermark大于窗口对应的结束时间,窗口才会关闭和进行计算。按照这个标准去处理数据,那么如果后面还有比这时间戳更小的数据,那么就视为迟到的数据,对于这部分迟到的数据,Flink也有相应的机制(下文会讲)去处理。

概念

1
2
3
4
5
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
1
2
3
4
5
6
7
8
9
比如:
08:00任务开启,设置1分钟的滚动窗口,在08:00:00-08:01:00为第一个窗口,08:01:00-08:02:00为第二个窗口;
现在有一条数据的事件时间是08:00:50,但是这条数据却在08:01:10到达,按照正常的处理,窗口会在结束时间(08:01:00)的时候就触发计算,那么这条数据就会被丢弃;

但是开启WaterMark后,窗口在08:01:00时不会触发;
因为采用的是EventTime,而数据本身时间是08:00:50,所以该条数据肯定会落到第一个窗口;
假设在08:01:10时的WaterMark为08:01:00(WaterMark可以理解为一个时间戳),发现这个WaterMark和第一个窗口的结束时间相等,此时触发第一个窗口的计算操作,此时这条延迟数据正好参与到计算中;
此时只有水印大于或等于窗口结束时间才会触发窗口的关闭和计算;
此时就不会丢数据。

WaterMark的传递

1
Watermark在向下游传递时,是广播到下游所有的子任务中,如果多并行度下有多个watermark传递到下游时,取最小的watermark。

WaterMark设置

1
2
3
4
5
6
7
8
9
注:如果你采用的是事件时间,即你设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
那么你就必须设置获取事件时间的方法,否则会报错(如果是从kafka消费数据,不设置水印的话,默认采用kafka消息自带的时间戳作为事件时间)

数据处理中需要通过调用DataStream中的assignTimestampsAndWatermarks方法来分配时间和水印,该方法可以传入两种参数,一个是Assigner With Periodic Watermarks,另一个是Assigner With Punctuated Watermarks。

所以设置Watermark是有如下两种方式:
Assigner With Punctuated Watermarks:数据流中每一个递增的EventTime都会产生一个Watermark。

Assigner With Periodic Watermarks:周期性的(一定时间间隔或者达到一定的记录条数)产生一个 Watermark。
1
实际生产中用第二种的比较多,它会周期性产生Watermark的方式,但是必须结合时间或者积累条数两个维度,否则在极端情况下会有很大的延时。
1
2
3
4
5
6
7
8
通常情况下,在接收到Source的数据后,应该立刻生成Watermark,但是也可以在使用Map或者Filter操作之后,再生成Watermark。

Watermark的生成方式有两种:
With Periodic Watermarks:周期性触发Watermark的生成和发送,每隔N秒自动向流里面注入一个Watermark,时间间隔由ExecutionConfig.setAutoWatermarkInterval决定,现在新版本的Flink默认是200ms。之前默认是100ms
可以定义一个最大允许乱序的时间,这种比较常用。

With Punctuated Watermarks:基于某些事件触发Watermark的生成和发送。
基于事件向流里面注入一个Watermark,每个元素都有机会判断是否生成一个watermark值

开发Watermark代码

乱序数据处理(数据有序)

1
2
3
4
5
6
前面我们学习了whatermark的一些基本原理,可能大家对它还不够了解,下面我们来通过这个案例加深大家对whatermark的理解。我们来分析一下这个案例。乱序数据处理

通过socket模拟数据。数据的格式是这样的。前面的话代表的是具体的业务数据,后边的话是一个时间戳,这是一个毫秒的时间戳。中间用逗号分隔。

其中,时间戳是数据产生的时间。也就是even time。那产生这个数据之后呢?然后使用map函数,把数据转换为tuple2的形式。接着再调用这个函数assignTimestampsAndWatermarks。使用这个方法来抽取timestamp并生成watermark。
接着,再调用window打印信息,来验证window被触发的时机。最后验证乱序数据的处理方式,这是我们一个大致的一个处理流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.imooc.scala.window

import java.text.SimpleDateFormat
import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
* Watermark+EventTime解决数据乱序问题
* Created by xuwei
*/
object WatermarkOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置使用数据产生的时间:EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置全局并行度为1
env.setParallelism(1)

//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(200)

val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
//将数据转换为tuple2的形式
//第一列表示具体的数据,第二列表示是数据产生的时间戳
val tupStream = text.map(line => {
val arr = line.split(",")
(arr(0), arr(1).toLong)
})

//分配(提取)时间戳(EventTime)和watermark
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间10s
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var currentMaxTimstamp = 0L

//从数据流中抽取时间戳作为EventTime
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
//计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
val currentWatermark = currentMaxTimstamp - 10000L
//此print语句仅仅是为了在学习阶段观察数据的变化
println("key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentMaxTimstamp + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
element._2
}
})
)

waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和调用TimeWindow效果一样(这里和前面自定义window时,传的参数有点不一样,这里是event)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//使用全量聚合的方式处理window中的数据
.apply(new WindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
//将window中的数据保存到arrBuff中
val arrBuff = ArrayBuffer[Long]()
input.foreach(tup=>{
arrBuff.append(tup._2)
})
//将arrBuff转换为arr
val arr = arrBuff.toArray
//对arr中的数据进行排序
Sorting.quickSort(arr)

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
val result = keyStr+","+arr.length+","+sdf.format(arr.head)+","+sdf.format(arr.last)+","+sdf.format(window.getStart)+","+sdf.format(window.getEnd)
out.collect(result)
}
}).print()
env.execute("WatermarkOpScala")
}
}
根据数据跟踪观察Watermark

image-20230421142543318

image-20230421142830892

image-20230421143511791

image-20230421143817886

image-20230421144017493

image-20230421144059007

image-20230421144132566

image-20230421144203353

1
到这里,window仍然没有被触发,此时watermark的时间已经等于第一条数据的eventtime了

image-20230421144516628

image-20230421144534532

1
window仍然没有被触发,此时,我们数据已经发送到2026-10-01 10:11:33了,根据eventtime来算,最早的数据已经过去了11s了,window还没开始计算,那到底什么时候会触发window呢?

image-20230421145025475

image-20230421145103798

image-20230421145208979

1
2
到这里,我们做了一个说明。
window的触发机制,是先按照自然时间将window划分,如果window大小是3s,那么1min内会把window划分成如下的形式(左闭右开的区间)

image-20230421145940525

image-20230421150008692

1
2
3
4
window的设定无关数据本身,而是系统定义好了的。
输入的数据,根据自身的eventtime,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=eventtime时,就符合了window触发的条件了,最终决定window触发,还是由eventtime所属window中的window_end_time决定。

上面的测试中,最后一条数据到达后,其水位线(watermark)已经上升至10:11:24,正好是最早的一条记录所在window的window_end_time,所以window就被触发了

image-20230421151156542

image-20230421151309893

1
2
此时,watermark时间虽然已经等于第二条数据的时间,但是由于其没有达到第二条数据所在window,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。那么,第二条数据所在的window时间区间如下。 
[00:00:24,00:00:27)
1
2
也就是说,我们必须输入一个10:11:37的数据,第二条数据所在的window才会被触发,我们继续输入。
0001,1790820697000

image-20230421152050886

1
2
3
此时,我们已经看到,window的触发条件要符合以下几个条件:
1.watermark时间>=wind_end_time
2.在[window_start_time,window_end_time)区间中有数据存在(注意是左闭右开的区间)

Watermark+EventTime(处理乱序数据)

1
2
我们前面那个测试啊,数据呢,都是按照这个时间顺序递增的,都是有序的,那现在呢,我们来输入一些的数据,来看看这个whatmark,结合这个一的eventtime机制是如何处理这些乱写数据的。那我们在上面那个基础之上啊,再输入两行数据。
注意这个呢,没有触发对吧

image-20230421190455535

image-20230421190548292

1
我们再输入一条43秒的数据

image-20230421191608587

image-20230421191923374

image-20230421192030363

1
2
3
大家注意他没有这个33的。我这个窗口呢,是[30,33),你看我这个一的代表里面数据是有这个33的,为什么这个33没有输出呢。因为这个窗口啊,它是一个左闭右开的。那这个33的话,它其实啊,属于下一个窗口,就是33到36的那个窗口。

好。所以上面这个结果其实已经表明对迟到的数据了,flink可以通过这个watermark来实现处理一定范围内的乱序数据。因为现在我们允许的最大乱序时间是十秒。就是十秒之内乱序是OK的,那如果超过了这个十秒怎么办?也就是说呢,对于这个迟到(late element)太久的数据,flink是怎么处理的呢?

延时数据的三种处理方式

丢弃(默认)
1
2
3
4
5
6
下面呢,我们就来看一下针对迟到太久的数据,它的一些处理方案,现在呢一共有三种。
第一种是丢弃默认的啊。那我们首先呢,来输入一个乱序很多的数据来测试一下(其实只要EventTime<watermark时间)

它这里是程序重启重新输的数据
0001,1790820690000
0001,1790820703000

image-20230422102536854

1
2
3
4
再输几个EventTime小于Watermark的时间
0001,1790820690000
0001,1790820691000
0001,1790820692000

image-20230422102748626

1
看到没有,这三条他都没有触发这个窗口的执行啊,因为你现在你输入的数据所在的窗口已经执行过了。flink默认对这些迟到的数据的处理方案就是丢弃。这几条数据,30对应的那个窗口数据是不是已经执行过了呀,那这样过来它直接丢弃,这是默认的一个处理方案。
allowedLateness
1
2
3
4
那接下来看第二种,你可以通过这个allowedLateness来指定一个允许数据延迟的时间。
本身啊,我们之前通过那个watermark已经设置了一个数据的延迟时间是十秒,对吧。你可以通过这个参数啊,再给他指定一个延迟时间,就类似于我们上班打卡官方延迟对吧,类似于公司统一层面允许大家呢弹性半小时。但是你们这个部门呢,可以再多谈十分钟,有这种效果。

在某些情况下,我们希望对迟到的数据再提供一个宽容时间。那flink提供了这个方法,可以实现对迟到的数据啊,再给它设置一个延迟时间,在指定延迟时间内到达数据还是可以触发window执行的。所以这时候我们需要去改一下代码了。主要呢,就增加这一行就行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.imooc.scala.window

import java.text.SimpleDateFormat
import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
* Watermark+EventTime解决数据乱序问题
* Created by xuwei
*/
object WatermarkOpForAllowedLatenessScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置使用数据产生的时间:EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置全局并行度为1
env.setParallelism(1)

//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(200)


val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
//将数据转换为tuple2的形式
//第一列表示具体的数据,第二列表示是数据产生的时间戳
val tupStream = text.map(line => {
val arr = line.split(",")
(arr(0), arr(1).toLong)
})

//分配(提取)时间戳和watermark
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间10s
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var currentMaxTimstamp = 0L

//从数据流中抽取时间戳作为EventTime
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
//计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
val currentWatermark = currentMaxTimstamp - 10000L
//此print语句仅仅是为了在学习阶段观察数据的变化
println("key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentWatermark + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
element._2
}
})
)

waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//允许数据迟到2秒
.allowedLateness(Time.seconds(2))
//使用全量聚合的方式处理window中的数据
.apply(new WindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
//将window中的数据保存到arrBuff中
val arrBuff = ArrayBuffer[Long]()
input.foreach(tup=>{
arrBuff.append(tup._2)
})
//将arrBuff转换为arr
val arr = arrBuff.toArray
//对arr中的数据进行排序
Sorting.quickSort(arr)

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
val result = keyStr+","+arr.length+","+sdf.format(arr.head)+","+sdf.format(arr.last)+","+sdf.format(window.getStart)+","+sdf.format(window.getEnd)
out.collect(result)
}
}).print()

env.execute("WatermarkOpScala")
}
}
1
这里又是重启之后了

image-20230422104446863

image-20230422104738555

1
那下面呢,我们再输入几条eventtime小于watermark的一个数据来验证一下效果。好,来看一下。注意你会发现,你看。这三条数据过来的时候,窗口同样被触发了,因为之前的话,我们是这个30到33这个窗口对吧。我在这输的这三条数据,一个是30秒了,31、32,它们都属于那个窗口。岁数,你看窗口都被吃光。你看这时候打印的窗口数据是两条,这是三条,这是四条对吧。所以说呢,每条数据都触发了window的执行啊。这三条数据。那下面我们再输一条数据。

image-20230422105039949

image-20230422105851955

image-20230422110036594

1
这时候啊,我们把这个whatmark呀,给它调到34。往上面调一下。看到没有,这次呢,它是没有触发的啊是34。数据呢是44,这样的话,whatmark变成了34。

image-20230422110328208

image-20230422110533985

1
此时呢,把whatmark上升到了34。此时呢,我们再输入几条这种迟到的数据来验证一下效果。因为刚才的话,我们验证了它是可以执行的啊。嗯。结果你会发现,看到没有,这三条又执行了。我们发现数的这三条数据呢,它都触发了window执行。

image-20230422110715967

image-20230422110824496

1
下面我们再输入一行数据,把watermark调到35。

image-20230422111217143

1
注意下面我们再输入Eventtime<Watermark的数据,还是那个三十三十一三十二啊。嗯。

image-20230422111430693

image-20230422111654208

1
2
3
4
5
6
7
8
9
10
11
12
13
注意此时这个窗口就不会触发,相当于啊,你这个时候你这个迟到的数据啊,我就不管了。

来分析一下。你看这几条数据啊,它都没有触发window啊。当这个whatermark等于这个33的时候,它正好呢,是属于这个window_end_time对吧,正好相等,所以说呢,它会触发这个[30到33)窗口执行。

当这个窗口执行过后啊,我们再输入[30到33)这个窗口内的数据的时候呢,会发现这个窗口是可以被触发的。

当我们把这个watermark提升到34秒的时候,我们再输入这个窗口内的数据,发现window还是可以被触发的。

那当我们把这个watermark升到35的时候,再输入这个窗口数据,发现window不会被触发。这是为什么呢?这是因为我们在前面设置了allowedLateness(Time.seconds(2))这个参数。又给它多加了两秒延迟,因此呢,可以允许延迟在两秒内的数据继续触发window执行。所以说当watermark等于34的时候,是可以触发window的,但是35就不行了,这个需要注意一下。

总结如下,对于这个窗口而言啊,它允许两秒的迟到数据,也就是说呢,你第一次触发是在watermark>=window_end_time的时候

当这个watermark=35的时候呢,我们再去输入这个eventtime为三十三十一三十二,这些数据的window_end_time都是33,此时35<33+2为false,所以不会再触发window执行了
总结

image-20230422112107568

image-20230422112129357

image-20230422160343061

sideOutputLateData
1
2
3
下面呢,还有一种处理方案。这个呢,就是收集迟到数据。通过这个函数呢,可以把迟到的数据啊,给它统一收集,统一存储,方便后期排查

注意咱们刚才讲那个第二种方案,其实可以和第三种结合到一块儿来使用,都是可以的啊,你再给他延迟两秒,如果说他还是没有到达,对吧,那你就把它保存起来,丢了保存起来。当然也可以单独使用,都是可以的啊,这个需要具体根据你们的业务需求来定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package com.imooc.scala.window

import java.text.SimpleDateFormat
import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
* Watermark+EventTime解决数据乱序问题
* Created by xuwei
*/
object WatermarkOpForSideOutputLateDataScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置使用数据产生的时间:EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置全局并行度为1
env.setParallelism(1)

//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(200)


val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
//将数据转换为tuple2的形式
//第一列表示具体的数据,第二列表示是数据产生的时间戳
val tupStream = text.map(line => {
val arr = line.split(",")
(arr(0), arr(1).toLong)
})

//分配(提取)时间戳和watermark
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间 10s
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var currentMaxTimstamp = 0L

//从数据流中抽取时间戳作为EventTime
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
//计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
val currentWatermark = currentMaxTimstamp - 10000L
//此print语句仅仅是为了在学习阶段观察数据的变化
println("key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentWatermark + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
element._2
}
})
)

//保存被丢弃的数据
val outputTag = new OutputTag[Tuple2[String,Long]]("late-data")

val resStream = waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//保存被丢弃的数据
.sideOutputLateData(outputTag)
//使用全量聚合的方式处理window中的数据
.apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
//将window中的数据保存到arrBuff中
val arrBuff = ArrayBuffer[Long]()
input.foreach(tup => {
arrBuff.append(tup._2)
})
//将arrBuff转换为arr
val arr = arrBuff.toArray
//对arr中的数据进行排序
Sorting.quickSort(arr)

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last) + "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)
out.collect(result)
}
})

//把迟到的数据取出来,暂时打印到控制台,实际工作中可以选择存储到其它存储介质中
//例如:redis,kafka
val sideOutput = resStream.getSideOutput(outputTag)
sideOutput.print()

//将流中的结果数据也打印到控制台
resStream.print()
env.execute("WatermarkOpScala")
}
}
1
我们来验证一下,先输入这两行数据。第一条。所以你看第一次发了一个30,这是43对吧。此时,这头的mark是33。

image-20230422162002780

image-20230422162014105

1
下面呢,我们再输入几条event time小于watermark的一个时间来测试一下啊。现在你这个窗口已经执行过了,我们再往里添加数据来看一下效果。还这三条数据啊。注意它那个窗口就没有执行了。你看针对这个迟到的数据,我们就可以通过这个sideOutputLateData来保存到这个outputTag中。后期你想在保存的其他存储介质中也是没有任何问题的。

image-20230422162202360

image-20230422162306215

在多并行度下的watermark应用

1
前面呢,我们演示了在单并行度下whatmark的使用,下面呢我们来看一下在多并行度下面watermark的一个使用。咱们前面的话我们将env.setParallelism(1)。如果不设置的话。那我们在IDE中去执行的时候,默认呢,它会读取我本地的CPU的数量来设置默认并行度。那所以说我在这把这个给它注释掉。在这里加一个系统ID,这样的话我们就知道了是哪条数据被哪个线程所处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.imooc.scala.window

import java.text.SimpleDateFormat
import java.time.Duration

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

/**
* Watermark+EventTime解决数据乱序问题
* Created by xuwei
*/
object WatermarkOpMoreParallelismScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置使用数据产生的时间:EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置全局并行度为1
env.setParallelism(2)

//设置自动周期性的产生watermark,默认值为200毫秒
env.getConfig.setAutoWatermarkInterval(200)


val text = env.socketTextStream("bigdata04", 9001)
import org.apache.flink.api.scala._
//将数据转换为tuple2的形式
//第一列表示具体的数据,第二列表示是数据产生的时间戳
val tupStream = text.map(line => {
val arr = line.split(",")
(arr(0), arr(1).toLong)
})

//分配(提取)时间戳和watermark
val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)) //最大允许的数据乱序时间 10s
.withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var currentMaxTimstamp = 0L

//从数据流中抽取时间戳作为EventTime
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimstamp = Math.max(timestamp, currentMaxTimstamp)
//计算当前watermark,为了打印出来方便观察数据,没有别的作用,watermark=currentMaxTimstamp-OutOfOrderness
val currentWatermark = currentMaxTimstamp - 10000L

val threadId = Thread.currentThread().getId
//此print语句仅仅是为了在学习阶段观察数据的变化
println("threadId:"+threadId+",key:" + element._1 + "," + "eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimstamp:[" + currentWatermark + "|" + sdf.format(currentMaxTimstamp) + "],watermark:[" + currentWatermark + "|" + sdf.format(currentWatermark) + "]")
element._2
}
})
)

waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//使用全量聚合的方式处理window中的数据
.apply(new WindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
//将window中的数据保存到arrBuff中
val arrBuff = ArrayBuffer[Long]()
input.foreach(tup=>{
arrBuff.append(tup._2)
})
//将arrBuff转换为arr
val arr = arrBuff.toArray
//对arr中的数据进行排序
Sorting.quickSort(arr)

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//将目前window内排序后的数据,以及window的开始时间和window的结束时间打印出来,便于观察
val result = keyStr+","+arr.length+","+sdf.format(arr.head)+","+sdf.format(arr.last)+","+sdf.format(window.getStart)+","+sdf.format(window.getEnd)
out.collect(result)
}
}).print()

env.execute("WatermarkOpScala")
}
}
1
输入如下7条数据,发现这个window没有被触发,因为这个时候呢,这七条数据啊,都是被不同的线程处理的,每个线程呢,都有一个watermark。我们前面分析了,在这种多并行度的情况下呢,whatmark呢,它呢有一个对齐机制,它呢会取所有channel中最小的那个watermark。但是呢我们现在默认有8个并行度,你这七条数据呢,都被不同的线路所处理啊。到现在呢,还没有获取到最小的watermark。所以说呢,这个window是无法被触发执行的。

image-20230422165413793

image-20230422165427792

1
因为这个线路太多了,验证起来了,不太好验证,所以说啊这样。把这个稍微再改一下。我们也不用了一个默认的八个了,我们给它改成2个吧。这个也是多并行度了。好。接下来呢,我们往里面输这么三条数据。

image-20230422170004992

image-20230422170229823

1
第一条,没有触发。接下来,第二条,其实理论上如果是单线程的话,这个时候这个窗口已经被触发,但是现在呢,还没有触发。这第三条数据。嗯。好。看到没有,这个时候他就出发。看一下这块的一个总结。此时呢,我们会发现,当第三条数据输入完以后,这个窗口呢,它就被触发了。你前两条数据啊,输入之后呢,它获取到的那个具体的watermark是20。这个时候呢,它对应的window中呢,是没有数据的,所以说呢,什么都没有执行,当你第三条数据输入之后呢,它获取到那个最小的mark呢,就是33了,这个时候呢,它对应的窗口就是它,它里面有数据,所以说呢,这个window就触发了。

总结

1
2
下面呢,我们来针对这个watermark案例做一个总结。我们在flink中,针对这个watermark,我们该如何设置它的最大乱序时间?注意。最大乱序时间。首先第一点这个要结合我们自己的业务以及呢数据的实际情况去设置,如果OutOfOrderness设置的太小,而我们那个自身数据啊,发送时由于网络等原因导致乱序或者迟到太多,那么呢,最终的结果就是会有很多数据被丢弃。这样的话,对我们数据的正确性影响太大。
那对于这个严重乱序的数据呢?我们需要严格统计数据的最大延迟时间。这样才能最大程度保证计算数据的一个准确度。延时时间设置太小会影响数据准确性。延时时间设置太大,不仅影响数据的一个实时性。更会加重flink作业的一个负担。所以说不是对EventTime要求特别严格的数据,尽量呢不要采用这种Eventtime的方式来处理数据。

image-20230422172108445

Flink并行度

1
2
大家好,下面呢,我们来分析一下Flink的并行度。一个flink程序由多个组件组成,datasource、transformation、datasink。
一个组件呢,由多个并行的实例(线程)来执行,或者说呢,是由多个线程来执行。一个组件的并行实际数目呢?就被称之为该组件的并行度。其实就是说你这个组件有多少个线程去执行,那么它的并行度(和并行执行能力并不相等)就是多少。

taskmanager和slot之间的关系

1
那下面呢,在具体分析这个并行度之前,我们先分析一下这个taskmanager和slot之间的关系。flink的每个taskmanager为集群提供的slot的数量通常与每个task manager的可用CPU数量成正比。一般情况下的数量就是每个taskmanager的可用CPU数量。这个task manager节点就是我们集群的一个从节点。那上面这个slot数量就是这个task manager具有的一个并发执行能力。这里面啊,实行的就是具体的一些实例。source、map、keyBy、sink。还有这个图也是一样的。

image-20230422175152422

image-20230422175512133

1
2
3
Flink中每个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。每个task slot表示TaskManager拥有资源的一个固定大小的子集。默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。这样的结果是,一个slot可以保存作业的整个管道。

在您描述的情况中,当并行度为1时,所有算子都在同一个slot中执行。当并行度为2时,算子被分成了三部分,其中两部分由2个slot执行,最后sink由一个单独的slot执行。这可能是因为Flink允许非资源密集型的算子和资源密集型的算子分配到同一个slot中,这样所有的slot之间任务就会平等,不会存在一直空闲一直高负载。

并行度的设置

1
2
3
4
5
6
7
那接下来我们就来看一下这个并行度该如何来设置。Flink任务的并行度可以通过4个层面来设置。
Operator Level(算子层面)
Execution Environment Level(执行环境层面)
Client Level(客户端层面)
System Level(系统层面)

那这四个层面,他们执行的优先级是什么样的?注意这些并行度优先级为Operator Level>Execution Environment Level>Client Level>System Level

image-20230422175706876

Operator Level

1
算子、数据源和目的地的并行度可以通过调用 setParallelism()方法来指定

image-20230422175809024

Execution Environment Level

1
2
3
这个执行环境层面的。主要呢,是在这个ENV后面来设置一个并行度。这设置的是一个全局的并行度。当然,你也可以选择在下面针对某一个算子再去改它的并行度也是可以的。因为你那个算子层面并行度是大于这个执行环境层面这个并行度的。

注意:执行环境的并行度可以通过显式设置算子的并行度而被重写。

image-20230422175856304

Client Level

1
接下来是一个客户端层面。这个并行度呢,可以在客户端提交Job的时候来设定。通过那个-P参数来动态指令就可以了。具体呢,是这样的。

image-20230422180247995

System Level

1
那最后呢,是这个系统层面了。我们在系统层面可以通过配置flink-conf.yaml文件里面parallelism.default属性来指定所有执行环境的默认并行度啊,当然了,你是可以在具体的任务里面再去动态的去改这个并行度。因为他们呢,可以覆盖这个系统层面的并行度。

并行度案例分析

1
下面呢,我们来通过一些案例来具体分析一下Flink中的并行度。首先看这个图。这个图里面呢,它表示啊,我们这个集群是有三个从节点。M1,M2,M3,注意每个节点上面具有三个slot。这个表示这个从节点,它具有的3个并发处理能力。那如何实现三个呢?在这个flink-conf.yaml里面来配置了taskmanager.numberOfTaskSlots,把它设置为3。这样话相当于我这个节点上面有三个空闲CPU。那这样的话,我这个集群啊,目前具有的一个处理能力就是9 slot

image-20230422181155611

1
第一个案例,它的并行度为1,那如何让它的并行度为1呢?很简单,你在提交这个任务的时候,什么参数都不设置就行。并且我们在开发这个word代码的时候,里面啊,也不设置并行度相关的代码,这样就可以了,这样它就会默认呢,读取这个flink-conf.yaml里面的parallelism的值。这个参数的默认值为1。

image-20230422181742166

1
第二个案例,如何实现让它的并行度为2呢?你可以通过这几种方式,首先呢,去改flink-conf.yaml把里面这个默认参数值改为二,或者说我们在动态提交的时候通过-P来指定。或者我们通过这个env来设置都是可以的。

image-20230422182315555

image-20230422182700747

1
第三个案例,它的并行度为9,那如何实现呢?你要么在这个配置文件flink-conf.yaml里面,把这个参数设置为9,要么呢-p动态指定。要么呢,通过env来设置都是可以的。这样的话,它就是9份了。这样就占满了,那说我能不能把这个并行度设为10呢?不能,因为你现在最终呢,只有九个slot。这个需要注意啊。

image-20230422183105341

1
第四个。我看呀,它这个并行度呢,还是9,但是注意针对这个sink组件的并行度啊,给它设置为1啊。我们在这主要分析一下这个新的组件并行度,全局设置为9,就是根据咱们前面这个案例。这三种你用哪种都可以。但是呢,我们还需要把这个新的组件并行度设置为1,那怎么设置呢?就说你在代码里面啊,通过算式层面来把这个新组件的并行度设置为1,这样的话它就会覆盖那个全局的那个9。当然你其他组件还是按那个九那个并行度去执行,而我这个组件的话,我在这给它覆盖掉,使用一给它覆盖掉。

本文标题:大数据开发工程师-第十七周 Flink极速上手篇-Flink高级进阶之路-1

文章作者:TTYONG

发布时间:2023年04月20日 - 16:04

最后更新:2023年05月31日 - 11:05

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%83%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-Flink%E9%AB%98%E7%BA%A7%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF-1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%