第十七周 Flink极速上手篇-Flink高级进阶之路-1
Window的概念和类型
1 | 大家好,前面我们学习了flink中的基本概念,集群部署以及核心API的使用,下面我们来学习一下flink中的高级特性的使用。首先,我们需要掌握中的window、time以及whatermark使用。然后我们需要掌握kafka-connector使用,这个是针对kafka一个专题。最后我们会学习一下Spark中的流式计算sparkStreaming,之前在学习spark的时候我们没有涉及这块,在这儿我们和flink一块来学习,可以加深理解,因为它们都是流式计算引擎。 |

1 | 下面来看这个滑动窗口,这个S轴呢,还是一个时间轴,你看这个是一个window的大小。这个表示是一个window的滑动间隔,这是WINDOW1这个红色的,它这个窗口从这到这儿,下面这个呢,WINDOW2,注意这个窗口它是从这儿到这儿,这个蓝色的看到没有,它里面呢,包含了WINDOW1里面的一部分数据。那你看WINDOW3 window3里面它包含了WINDOW2里面的一部分数据,所以说这个滑动窗口,它们每个窗口之间呀,会有数据重叠,这个就这两种窗口它的一个区别 |

1 | 下面我针对这个窗口的类型做了一个汇总。你看这是window window下面有time window有count window还有自定义window,那这些window再往下面你看它呢,可以实现滚动窗口或者滑动窗口,对吧?不管你是基于time的,还是基于count的,还是自定义的,你们都可以实现滚动窗口或者是滑动窗口。 |

TimeWindow的使用
scala
1 | 下面呢,我们来看一下这些window的具体应用,首先来看第一个time window。time window呢是根据时间对数据流切分窗口,time window可以支持滚动窗口和滑动窗口。 |
1 | package com.imooc.scala.window |
1 | timeWindow滚动窗口 |


1 | timeWindow滑动窗口,黑色第一次输入,蓝色第二次输入 |

1 | 第三次打印,蓝色 |

java
1 | package com.imooc.java.window; |
CountWindow的使用
1 | 下面呢,我们来看一下count的Window的使用,count Window是根据元素个数对数据流切分窗口。count window也可以支持滚动窗口和滑动窗口。 |
scala
1 | package com.imooc.scala.window |
1 | 滚动窗口 |


1 | 所以我在这再输三个,看到没有,到这儿才刚开始执行了一次,他把这个hello打印出来五个。那这个you和me为什么没有打印呢?注意了,所以啊,我们在这啊执行了keyby会对这个数据进行分组,如果某个分组对应的数据窗口内达到了五个元素,这个窗口才会被处罚执行,所以说这个时候相当于是hello对应的那个窗口,它里面够五个元素了,它才会执行。 |


1 | 看一下count滑动窗口执行结果 |








java
1 | package com.imooc.java.window; |
自定义Window的使用
1 | 下面呢,我们来看一下自定义window。其实呢,window还可以再细分一下。可以把它分为呢,一种是基于Key的window。一种是不基于Key的window。其实就是说咱们在使用window之前是否执行了key操作啊,咱们前面演示的都是这种基于Key的window。你看我们在做window之前,前面呢都做了Keyby对吧,那如果呢,需求中不需要根据Key进行分组,你在使用window的时候啊,我们需要对应的去使用那个timeWindowAll和countWindowAll。 |

1 | 你使用KeyBy之后的话,它就只能调那个timeWindow,countWindow(这两个也可以用window实现),这个需要注意一下啊,那如果说是我们自定义的window。如何使用呢?对吧,针对这两种情况。来看一下。针对这个基于Key的window呀,我们需要使用这个window函数 |
scala
1 | val input: DataStream[T] = ... |
1 | package com.imooc.scala.window |
1 | 这个呢,和咱们之前啊使用的什么timewindow那个效果是一样的。这样的话更加灵活一些,我们想怎么定义都可以啊。如果你不使用这个KeyBy的话,那下面你就可以使用windowAll是一样的效果 |
java
1 | package com.imooc.java.window; |
Window中的增量聚合和全量聚合
增量聚合
1 | 下面呢,我们来看一下Window聚合。就是在进行Window聚合操作的时候呢,可以分为两种情况。一种呢是增量聚合,还有一种是全量聚合。 |

1 | 它的具体执行过程是这样的。第一次进来一条数据,则立刻进行累加,求和结果为八,第二次进来一条数据12,则立刻进行累加,求和结果为20。第三次进来一条数据七,则立刻进行累加求和,结果为27。第四次进来一条数据,则立刻进行累加求和,结果为37。这就是这个增量聚合它的一个执行流程。 |
全量聚合
1 | 那下面呢,我们来看一下全量集合。全量集合呀,它就是等属于窗口的数据都到齐了,才开始进行聚合计算,可以实现对窗口内的数据进行排序等需求。常见的一些全量聚合函数为: |

1 | 第四次进来调数据10,此时窗口触发,这时候才会对窗口内的数据进行排序,然后获取最大值。 |
全量聚合apply

1 | 下面呢,我们来看一下这个apply函数的一个使用。从这你可以看出来,他接触的是一个iterable,可以认为是一个集合。他可以把这个窗口的数据啊,一次性全都传过来,当这个窗口触发的时候,才会真正执行这个代码。 |
全量聚合process

1 | 下面呢是一个process。你看他接触的也是一个iterable,所以说呢,你在这里面就可以获取到这个窗口里面的所有数据了。 |
1 | 这个呢就是Windows中的全量聚合和增量聚合,后面呢我们就会用到这个apply,还有process它的一个使用,因为有时候我们需要对这个窗口内的所有数据去做一些全量的操作,这样的话就不能用这种增量聚合,而要用这种全量聚合。 |
Flink中的Time
1 | 针对流数据的time可以分为以下三种。第一个Event Time表示事件产生的时间,它通常由事件中的时间戳来描述。第二个ingestion time表示事件进入flink的时间。第三个processing time,它表示事件被处理时当前系统的时间,那这几种时间呀,我们通过这个图可以很清晰的看出来它们之间的关系。 |

1 | 首先是even time,这个就是数据产生的时间。第二个是ingestion time表示呢,他进入flink时间,其实就是被那个source把它读取过来那个时间。第三个呢,是这个processing time,它其实呢,就是flink里面具体的算子,在处理的时候它的一个时间,那接下来我们来看一个案例。 |

1 | 注意你看数据呢,是在十点的时候产生的。结果呢,在晚上八点的时候才被flink读取走。那flink真正在处理的时候呢?是8.02秒。 |
Watermark的分析
背景

1 | 实时计算中,数据时间比较敏感。有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。 |
1 | 前面提到了Time的概念,如果我们使用Processing Time,那么在Flink消费数据的时候,它完全不需要关心数据本身的时间,意思也就是说不需要关心数据到底是延迟数据还是乱序数据。因为Processing Time只是代表数据在Flink被处理时的时间,这个时间是顺序的。 |

1 | 然而在有些场景下,尤其是特别依赖于事件时间而不是处理时间,比如: |
1 | 统计8:00 ~ 9:00这个时间段打开淘宝App的用户数量,Flink这边可以开个窗口做聚合操作,但是由于网络的抖动或者应用采集数据发送延迟等问题,于是无法保证在窗口时间结束的那一刻窗口中是否已经收集好了在8:00 ~ 9:00中用户打开App的事件数据,但又不能无限期的等下去? |
概念
1 | watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。 |
1 | 比如: |
WaterMark的传递
1 | Watermark在向下游传递时,是广播到下游所有的子任务中,如果多并行度下有多个watermark传递到下游时,取最小的watermark。 |
WaterMark设置
1 | 注:如果你采用的是事件时间,即你设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); |
1 | 实际生产中用第二种的比较多,它会周期性产生Watermark的方式,但是必须结合时间或者积累条数两个维度,否则在极端情况下会有很大的延时。 |
1 | 通常情况下,在接收到Source的数据后,应该立刻生成Watermark,但是也可以在使用Map或者Filter操作之后,再生成Watermark。 |
开发Watermark代码
乱序数据处理(数据有序)
1 | 前面我们学习了whatermark的一些基本原理,可能大家对它还不够了解,下面我们来通过这个案例加深大家对whatermark的理解。我们来分析一下这个案例。乱序数据处理 |
1 | package com.imooc.scala.window |
根据数据跟踪观察Watermark








1 | 到这里,window仍然没有被触发,此时watermark的时间已经等于第一条数据的eventtime了 |


1 | window仍然没有被触发,此时,我们数据已经发送到2026-10-01 10:11:33了,根据eventtime来算,最早的数据已经过去了11s了,window还没开始计算,那到底什么时候会触发window呢? |



1 | 到这里,我们做了一个说明。 |


1 | window的设定无关数据本身,而是系统定义好了的。 |


1 | 此时,watermark时间虽然已经等于第二条数据的时间,但是由于其没有达到第二条数据所在window,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。那么,第二条数据所在的window时间区间如下。 |
1 | 也就是说,我们必须输入一个10:11:37的数据,第二条数据所在的window才会被触发,我们继续输入。 |

1 | 此时,我们已经看到,window的触发条件要符合以下几个条件: |
Watermark+EventTime(处理乱序数据)
1 | 我们前面那个测试啊,数据呢,都是按照这个时间顺序递增的,都是有序的,那现在呢,我们来输入一些的数据,来看看这个whatmark,结合这个一的eventtime机制是如何处理这些乱写数据的。那我们在上面那个基础之上啊,再输入两行数据。 |


1 | 我们再输入一条43秒的数据 |



1 | 大家注意他没有这个33的。我这个窗口呢,是[30,33),你看我这个一的代表里面数据是有这个33的,为什么这个33没有输出呢。因为这个窗口啊,它是一个左闭右开的。那这个33的话,它其实啊,属于下一个窗口,就是33到36的那个窗口。 |
延时数据的三种处理方式
丢弃(默认)
1 | 下面呢,我们就来看一下针对迟到太久的数据,它的一些处理方案,现在呢一共有三种。 |

1 | 再输几个EventTime小于Watermark的时间 |

1 | 看到没有,这三条他都没有触发这个窗口的执行啊,因为你现在你输入的数据所在的窗口已经执行过了。flink默认对这些迟到的数据的处理方案就是丢弃。这几条数据,30对应的那个窗口数据是不是已经执行过了呀,那这样过来它直接丢弃,这是默认的一个处理方案。 |
allowedLateness
1 | 那接下来看第二种,你可以通过这个allowedLateness来指定一个允许数据延迟的时间。 |
1 | package com.imooc.scala.window |
1 | 这里又是重启之后了 |


1 | 那下面呢,我们再输入几条eventtime小于watermark的一个数据来验证一下效果。好,来看一下。注意你会发现,你看。这三条数据过来的时候,窗口同样被触发了,因为之前的话,我们是这个30到33这个窗口对吧。我在这输的这三条数据,一个是30秒了,31、32,它们都属于那个窗口。岁数,你看窗口都被吃光。你看这时候打印的窗口数据是两条,这是三条,这是四条对吧。所以说呢,每条数据都触发了window的执行啊。这三条数据。那下面我们再输一条数据。 |



1 | 这时候啊,我们把这个whatmark呀,给它调到34。往上面调一下。看到没有,这次呢,它是没有触发的啊是34。数据呢是44,这样的话,whatmark变成了34。 |


1 | 此时呢,把whatmark上升到了34。此时呢,我们再输入几条这种迟到的数据来验证一下效果。因为刚才的话,我们验证了它是可以执行的啊。嗯。结果你会发现,看到没有,这三条又执行了。我们发现数的这三条数据呢,它都触发了window执行。 |


1 | 下面我们再输入一行数据,把watermark调到35。 |

1 | 注意下面我们再输入Eventtime<Watermark的数据,还是那个三十三十一三十二啊。嗯。 |


1 | 注意此时这个窗口就不会触发,相当于啊,你这个时候你这个迟到的数据啊,我就不管了。 |
总结



sideOutputLateData
1 | 下面呢,还有一种处理方案。这个呢,就是收集迟到数据。通过这个函数呢,可以把迟到的数据啊,给它统一收集,统一存储,方便后期排查 |
1 | package com.imooc.scala.window |
1 | 我们来验证一下,先输入这两行数据。第一条。所以你看第一次发了一个30,这是43对吧。此时,这头的mark是33。 |


1 | 下面呢,我们再输入几条event time小于watermark的一个时间来测试一下啊。现在你这个窗口已经执行过了,我们再往里添加数据来看一下效果。还这三条数据啊。注意它那个窗口就没有执行了。你看针对这个迟到的数据,我们就可以通过这个sideOutputLateData来保存到这个outputTag中。后期你想在保存的其他存储介质中也是没有任何问题的。 |


在多并行度下的watermark应用
1 | 前面呢,我们演示了在单并行度下whatmark的使用,下面呢我们来看一下在多并行度下面watermark的一个使用。咱们前面的话我们将env.setParallelism(1)。如果不设置的话。那我们在IDE中去执行的时候,默认呢,它会读取我本地的CPU的数量来设置默认并行度。那所以说我在这把这个给它注释掉。在这里加一个系统ID,这样的话我们就知道了是哪条数据被哪个线程所处理。 |
1 | package com.imooc.scala.window |
1 | 输入如下7条数据,发现这个window没有被触发,因为这个时候呢,这七条数据啊,都是被不同的线程处理的,每个线程呢,都有一个watermark。我们前面分析了,在这种多并行度的情况下呢,whatmark呢,它呢有一个对齐机制,它呢会取所有channel中最小的那个watermark。但是呢我们现在默认有8个并行度,你这七条数据呢,都被不同的线路所处理啊。到现在呢,还没有获取到最小的watermark。所以说呢,这个window是无法被触发执行的。 |


1 | 因为这个线路太多了,验证起来了,不太好验证,所以说啊这样。把这个稍微再改一下。我们也不用了一个默认的八个了,我们给它改成2个吧。这个也是多并行度了。好。接下来呢,我们往里面输这么三条数据。 |


1 | 第一条,没有触发。接下来,第二条,其实理论上如果是单线程的话,这个时候这个窗口已经被触发,但是现在呢,还没有触发。这第三条数据。嗯。好。看到没有,这个时候他就出发。看一下这块的一个总结。此时呢,我们会发现,当第三条数据输入完以后,这个窗口呢,它就被触发了。你前两条数据啊,输入之后呢,它获取到的那个具体的watermark是20。这个时候呢,它对应的window中呢,是没有数据的,所以说呢,什么都没有执行,当你第三条数据输入之后呢,它获取到那个最小的mark呢,就是33了,这个时候呢,它对应的窗口就是它,它里面有数据,所以说呢,这个window就触发了。 |
总结
1 | 下面呢,我们来针对这个watermark案例做一个总结。我们在flink中,针对这个watermark,我们该如何设置它的最大乱序时间?注意。最大乱序时间。首先第一点这个要结合我们自己的业务以及呢数据的实际情况去设置,如果OutOfOrderness设置的太小,而我们那个自身数据啊,发送时由于网络等原因导致乱序或者迟到太多,那么呢,最终的结果就是会有很多数据被丢弃。这样的话,对我们数据的正确性影响太大。 |

Flink并行度
1 | 大家好,下面呢,我们来分析一下Flink的并行度。一个flink程序由多个组件组成,datasource、transformation、datasink。 |
taskmanager和slot之间的关系
1 | 那下面呢,在具体分析这个并行度之前,我们先分析一下这个taskmanager和slot之间的关系。flink的每个taskmanager为集群提供的slot的数量通常与每个task manager的可用CPU数量成正比。一般情况下的数量就是每个taskmanager的可用CPU数量。这个task manager节点就是我们集群的一个从节点。那上面这个slot数量就是这个task manager具有的一个并发执行能力。这里面啊,实行的就是具体的一些实例。source、map、keyBy、sink。还有这个图也是一样的。 |


1 | Flink中每个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能接收多少个task,worker通过task slot来进行控制(一个worker至少有一个task slot)。每个task slot表示TaskManager拥有资源的一个固定大小的子集。默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务(前提是它们来自同一个job)。这样的结果是,一个slot可以保存作业的整个管道。 |
并行度的设置
1 | 那接下来我们就来看一下这个并行度该如何来设置。Flink任务的并行度可以通过4个层面来设置。 |

Operator Level
1 | 算子、数据源和目的地的并行度可以通过调用 setParallelism()方法来指定 |

Execution Environment Level
1 | 这个执行环境层面的。主要呢,是在这个ENV后面来设置一个并行度。这设置的是一个全局的并行度。当然,你也可以选择在下面针对某一个算子再去改它的并行度也是可以的。因为你那个算子层面并行度是大于这个执行环境层面这个并行度的。 |

Client Level
1 | 接下来是一个客户端层面。这个并行度呢,可以在客户端提交Job的时候来设定。通过那个-P参数来动态指令就可以了。具体呢,是这样的。 |

System Level
1 | 那最后呢,是这个系统层面了。我们在系统层面可以通过配置flink-conf.yaml文件里面parallelism.default属性来指定所有执行环境的默认并行度啊,当然了,你是可以在具体的任务里面再去动态的去改这个并行度。因为他们呢,可以覆盖这个系统层面的并行度。 |
并行度案例分析
1 | 下面呢,我们来通过一些案例来具体分析一下Flink中的并行度。首先看这个图。这个图里面呢,它表示啊,我们这个集群是有三个从节点。M1,M2,M3,注意每个节点上面具有三个slot。这个表示这个从节点,它具有的3个并发处理能力。那如何实现三个呢?在这个flink-conf.yaml里面来配置了taskmanager.numberOfTaskSlots,把它设置为3。这样话相当于我这个节点上面有三个空闲CPU。那这样的话,我这个集群啊,目前具有的一个处理能力就是9 slot |

1 | 第一个案例,它的并行度为1,那如何让它的并行度为1呢?很简单,你在提交这个任务的时候,什么参数都不设置就行。并且我们在开发这个word代码的时候,里面啊,也不设置并行度相关的代码,这样就可以了,这样它就会默认呢,读取这个flink-conf.yaml里面的parallelism的值。这个参数的默认值为1。 |

1 | 第二个案例,如何实现让它的并行度为2呢?你可以通过这几种方式,首先呢,去改flink-conf.yaml把里面这个默认参数值改为二,或者说我们在动态提交的时候通过-P来指定。或者我们通过这个env来设置都是可以的。 |


1 | 第三个案例,它的并行度为9,那如何实现呢?你要么在这个配置文件flink-conf.yaml里面,把这个参数设置为9,要么呢-p动态指定。要么呢,通过env来设置都是可以的。这样的话,它就是9份了。这样就占满了,那说我能不能把这个并行度设为10呢?不能,因为你现在最终呢,只有九个slot。这个需要注意啊。 |

1 | 第四个。我看呀,它这个并行度呢,还是9,但是注意针对这个sink组件的并行度啊,给它设置为1啊。我们在这主要分析一下这个新的组件并行度,全局设置为9,就是根据咱们前面这个案例。这三种你用哪种都可以。但是呢,我们还需要把这个新的组件并行度设置为1,那怎么设置呢?就说你在代码里面啊,通过算式层面来把这个新组件的并行度设置为1,这样的话它就会覆盖那个全局的那个9。当然你其他组件还是按那个九那个并行度去执行,而我这个组件的话,我在这给它覆盖掉,使用一给它覆盖掉。 |