大数据开发工程师-第十六周 Flink极速上手篇-实战:流处理和批处理程序开发-2


第十六周 Flink极速上手篇-实战:流处理和批处理程序开发-2

Flink快速上手使用

1
2
3
创建maven项目,因为要使用scala编写代码,在src里main里除了java目录,还要创建scala目录,再创建包

setting里的module里的scala sdk要导入

image-20230408154825118

1
接下来在pom.xml 中引入flink相关依赖,前面两个是针对java代码的,后面两个是针对scala代码的,最后一个依赖是这对flink1.11这个版本需要添加的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependency> 
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.1</version>
</dependency>
1
2
3
4
5
6
7
8
在开发Flink程序之前,我们先来梳理一下开发一个Flink程序的步骤
1:获得一个执行环境
2:加载/创建 初始化数据
3:指定操作数据的transaction算子
4:指定数据目的地
5:调用execute()触发执行程序

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序和Spark类似,Spark中是必须要有action算子才会真正执行。

Streaming WordCount

1
2
3
4
5
需求
通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来
代码开发
下面我们就来开发第一个Flink程序。
先使用scala代码开发

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.imooc.scala
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 需求:通过Socket实时产生一些单词,
* 使用Flink实时接收数据
* 对指定时间窗口内(例如:2秒)的数据进行聚合统计
* 并且把时间窗口内计算的结果打印出来
* Created by xuwei
*/
object SocketWindowWordCountScala {
/**
* 注意:在执行代码之前,需要先在bigdata04机器上开启socket,端口为9001
* @param args
*/
def main(args: Array[String]): Unit = {
//获取运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("bigdata04", 9001)
//处理数据
//注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
.map((_,1))//每一个单词转换为tuple2的形式(单词,1)
//.keyBy(0)//根据tuple2中的第一列进行分组
.keyBy(tup=>tup._1)//官方推荐使用keyselector选择器选择数据
.timeWindow(Time.seconds(2))//时间窗口为2秒,表示每隔2秒钟计算一次接收到的数据
.sum(1)// 使用sum或者reduce都可以
//.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
//使用一个线程执行打印操作
wordCount.print().setParallelism(1)
//执行程序
env.execute("SocketWindowWordCountScala")
}
}
1
注意:在idea等开发工具里面运行代码的时候需要把pom.xml中的scope配置注释掉

image-20230408222314468

1
2
3
4
5
在bigdata04上面开启socket
[root@bigdata04 ~]# nc -l 9001
hello you
hello me
hello you hello me

image-20230408222535380

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
注意:此时代码执行的时候下面会显示一些红色的log4j的警告信息,提示缺少相关依赖和配置

将log4j.properties配置文件和log4j的相关maven配置添加到pom.xml文件中

<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
</dependency>

image-20230408222823964

1
此时再执行就没有红色的警告信息了,但是使用info日志级别打印的信息太多了,所以将log4j中的日志级别配置改为error级别

image-20230408222903540

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.imooc.java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 需求:通过socket实时产生一些单词
* 使用Flink实时接收数据
* 对指定时间窗口内(例如:2秒)的数据进行聚合统计
* 并且把时间窗口内计算的结果打印出来
* Created by xuwei
*/
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception{
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = env.socketTextStream("bigdata04", 9001);
//处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = text.flatMap(new FlatMapFunction<String,String>(){
public void flatMap(String line, Collector<String> out) throws Exception{
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception
return new Tuple2<String, Integer>(word, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
public String getKey(Tuple2<String, Integer> tup) throws Exception{
return tup.f0;
}
})//.keyBy(0)
.timeWindow(Time.seconds(2))
.sum(1);
//使用一个线程执行打印操作
wordCount.print().setParallelism(1);
//执行程序
env.execute("SocketWindowWordCountJava");
}
}

Batch WordCount

1
2
需求:统计指定文件中单词出现的总次数
下面来开发Flink的批处理代码

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.imooc.scala
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* 需求:统计指定文件中单词出现的总次数
* Created by xuwei
*/
object BatchWordCountScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "hdfs://bigdata01:9000/hello.txt"
val outPath = "hdfs://bigdata01:9000/out"
//读取文件中的数据
val text = env.readTextFile(inputPath)
//处理数据
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
.setParallelism(1) // 这里设置并行度是为了将所有数据写到一个文件里,查看比较方便
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ")
//执行程序
env.execute("BatchWordCountScala")
}
}
1
2
注意:这里面执行setParallelism(1)设置并行度为1是为了将所有数据写到一个文件里面,我们查看结果的时候比较方便(此时输出路径会变成一个文件)
还有就是flink在windows中执行代码,使用到hadoop的时候,需要将hadoop-client的依赖添加到项目中,否则会提示不支持hdfs这种文件系统。
1
2
3
4
5
6
7
8
9
10
在pom.xml文件中增加

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>

此时执行代码就可以正常执行了。
执行成功之后到hdfs上查看结果

image-20230408225649895

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.imooc.java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 需求:统计指定文件中单词出现的总次数
* Created by xuwei
*/
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment
String inputPath = "hdfs://bigdata01:9000/hello.txt";
String outPath = "hdfs://bigdata01:9000/out2";
//读取文件中的数据
DataSource<String> text = env.readTextFile(inputPath);
//处理数据(这里flatmap之后,没写map,直接一步到位)
DataSet<Tuple2<String, Integer>> wordCount = text.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>(){
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).groupBy(0)
.sum(1)
.setParallelism(1);
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ");
//执行程序
env.execute("BatchWordCountJava");
}
}
1
2
3
4
5
6
7
流处理Streaming
执行环境:StreamExecutionEnvironment
数据类型:DataStream

批处理Batch
执行环境:ExecutionEnvironment
数据类型:DataSet

Flink集群安装部署

1
2
3
4
5
6
7
8
Flink支持多种安装部署方式
Standalone
ON YARN
Mesos、Kubernetes、AWS…
这些安装方式我们主要讲一下standalone和on yarn。
如果是一个独立环境的话,可能会用到standalone集群模式。
在生产环境下一般还是用on yarn 这种模式比较多,因为这样可以综合利用集群资源。和我们之前讲的spark on yarn是一样的效果
这个时候我们的Hadoop集群上面既可以运行MapReduce任务,Spark任务,还可以运行Flink任务,一举三得。

standalone

1
2
下面我们来看一下standalone模式
它的架构是这样的

image-20230408232348677

1
2
3
4
5
6
7
8
9
依赖环境
jdk1.8及以上【配置JAVA_HOME环境变量】
ssh免密码登录
在这我们使用bigdata01、02、03这三台机器,这几台机器的基础环境都是ok的,可以直接使用。

集群规划如下:
master:bigdata01
slave:bigdata02、bigdata03
接下来我们需要先去下载Flink的安装包。

image-20230408232658079

1
2
由于目前Flink各个版本之间差异比较大,属于快速迭代阶段,所以在这我们就使用最新版本了,使用Flink1.11.1版本。
安装包下载好以后上传到bigdata01的/data/soft目录中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
下面开始安装Flink集群
1:解压

2:修改配置
[root@bigdata01 soft]# cd flink-1.11.1
[root@bigdata01 flink-1.11.1]# cd conf/
[root@bigdata01 conf]# vi flink-conf.yaml
......
jobmanager.rpc.address: bigdata01
......
[root@bigdata01 conf]# vi masters
bigdata01:8081
[root@bigdata01 conf]# vi workers
bigdata02
bigdata03
1
2
3
4
5
6
7
8
9
10
11
3:将修改完配置的flink目录拷贝到其它两个从节点
[root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata02:/data/soft/
[root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata03:/data/soft/

4:启动Flink集群
[root@bigdata01 soft]# cd flink-1.11.1
[root@bigdata01 flink-1.11.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata01.
Starting taskexecutor daemon on host bigdata02.
Starting taskexecutor daemon on host bigdata03.
1
2
3
4
5:验证一下进程
在bigdata01上执行jps
[root@bigdata01 flink-1.11.1]# jps
3986 StandaloneSessionClusterEntrypoint
1
2
3
4
5
6
在bigdata02上执行jps
在bigdata03上执行jps
[root@bigdata02 ~]# jps
2149 TaskManagerRunner
[root@bigdata03 ~]# jps
2150 TaskManagerRunner
1
2
3
4
5
6
7
6:访问Flink的web界面
http://bigdata01:8081
7:停止集群,在主节点上执行停止集群脚本
[root@bigdata01 flink-1.11.1]# bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 2149) on host bigdata02.
Stopping taskexecutor daemon (pid: 2150) on host bigdata03.
Stopping standalonesession daemon (pid: 3986) on host bigdata01.

Standalone集群核心参数

1
2
3
4
5
参数 									解释
jobmanager.memory .process.size 主节点可用内存大小
taskmanager.memory.process.size 从节点可用内存大小
taskmanager.numberOfTaskSlots 从节点可以启动的进程数量,建议设置为从节可用的cpu数量
parallelism.default Flink任务的默认并行度
1
2
3
1:slot是静态的概念,是指taskmanager具有的并发执行能力
2:parallelism是动态的概念,是指程序运行时实际使用的并发能力
3:设置合适的parallelism能提高程序计算效率,太多了和太少了都不好
1
2
3
4
Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。
注意:
1:Flink ON YARN 模式依赖Hadoop 2.4.1及以上版本
2:Flink ON YARN支持两种使用方式

image-20230408234110607

1
在工作中建议使用第二种方式。
1
2
3
4
5
6
下面来看一下第一种方式
第一步:在集群中初始化一个长时间运行的Flink集群
使用yarn-session.sh脚本
第二步:使用flink run命令向Flink集群中提交任务

注意:使用flink on yarn需要确保hadoop集群已经启动成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
下面来具体演示一下
首先在bigdata04机器上安装一个Flink客户端,其实就是把Flink的安装包上传上去解压即可,不需要启动

接下来在执行yarn-session.sh脚本之前我们需要先设置HADOOP_CLASSPATH这个环境变量,否则,执行
yarn-session.sh是会报错的,提示找不到hadoop的一些依赖。

在/etc/profile中配置HADOOP_CLASSPATH
[root@bigdata04 flink-1.11.1]# vi /etc/profile
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin
export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7
export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HO
ME/bin:$SQOOP_HOME/bin:$PATH

image-20230408234907861

1
2
接下来,使用yarn-session.sh在YARN中创建一个长时间运行的Flink集群
[root@bigdata04 flink-1.11.1]# bin/yarn-session.sh -jm 1024m -tm 1024m -d
1
2
这个表示创建一个Flink集群,-jm是指定主节点的内存,-tm是指定从节点的内存,-d是表示把这个进程放到后台去执行。
启动之后,会看到类似这样的日志信息,这里面会显示flink web界面的地址,以及这个flink集群在yarn中对应的applicationid。

image-20230408234705575

1
此时到YARN的web界面中确实可以看到这个flink集群。

image-20230408235222041

image-20230408235427696

1
可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面的

image-20230408235310889

1
2
接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例
[root@bigdata04 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
1
注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应的applicationid。

image-20230408235700425

image-20230408235714610

1
任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。

image-20230408235924462

1
注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。
1
2
3
4
5
最后把这个Flink集群停掉,使用yarn的kill命令
[root@bigdata04 flink-1.11.1]# yarn application -kill application_17689063095
2026-01-20 23:25:22,548 INFO client.RMProxy: Connecting to ResourceManager at
Killing application application_1768906309581_0005
2026-01-20 23:25:23,239 INFO impl.YarnClientImpl: Killed application_17689063
1
针对yarn-session命令,它后面还支持一些其它参数,可以在后面传一个-help参数

image-20230409000251119

1
注意:这里的-j是指定Flink任务的jar包,此参数可以省略不写也可以
1
2
3
4
5
6
flink run -m yarn-cluster(创建Flink集群+提交任务)
使用flink run直接创建一个临时的Flink集群,并且提交任务
此时这里面的参数前面加上了一个y参数
[root@bigdata04 flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

提交上去之后,会先创建一个Flink集群,然后在这个Flink集群中执行任务。

image-20230409000904789

1
针对Flink命令的一些用法汇总

image-20230409001026836

1
2
1:提高大数据集群机器的利用率
2:一套集群,可以执行MR任务,Spark任务,Flink任务等

向集群中提交Flink任务

1
2
3
接下来我们希望把前面我们自己开发的Flink任务提交到集群上面,在这我就使用flink on yarn的第二种方式来向集群提交一个Flink任务。

第一步:在pom.xml中添加打包配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
注意:需要将Flink和Hadoop的相关依赖的score属性设置为provided,这些依赖不需要打进jar包里面。
1
2
3
4
5
6
7
8
第二步:生成jar包: mvn clean package -DskipTests
第三步:将 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar 上传到bigdata04机器上的 /data/sof
t/flink-1.11.1 目录中(上传到哪个目录都可以)

第四步:提交Flink任务
注意:提交任务之前,先开启socket
[root@bigdata04 ~]# nc -l 9001
[root@bigdata04 flink-1.11.1]# bin/flink run -m yarn-cluster -c com.imooc.scala.SocketWindowWordCountScala -yjm 1024 -ytm 1024 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar
1
此时到yarn上面可以看到确实新增了一个任务,点击进去可以看到flink的web界面

image-20230409002811696

1
通过socket输入一串内容

image-20230409002838306

image-20230409002909775

image-20230409002934711

image-20230409003014134

image-20230409003040435

image-20230409003059283

1
2
接下来我们希望把这个任务停掉,因为这个任务是一个流处理的任务,提交成功之后,它会一直运行。
注意:此时如果我们使用ctrl+c关掉之前提交任务的那个进程,这里的flink任务是不会有任何影响的,可以一直运行,因为flink任务已经提交到hadoop集群里面了。

image-20230409003221865

停止任务

1
2
3
4
此时如果想要停止Flink任务,有两种方式

1:停止yarn中任务(这种是直接停止yarn中flink集群,任务也就停止了)
[root@bigdata04 flink-1.11.1]# yarn application -kill application_17689629561
1
2
2:停止flink任务
可以在界面上点击这个按钮,或者在命令行中执行flink cancel停止都可以

image-20230409003528258

1
2
或者
[root@bigdata04 flink-1.11.1]# bin/flink cancel -yid application_176896295613 d7bo35cf4co10xxxxxxxxxxx(具体任务ID)

image-20230409003917743

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这个flink任务停止之后,对应的那个yarn-session(Flink集群)也就停止了。

注意:此时flink任务停止之后就无法再查看flink的web界面了,如果想看查看历史任务的执行信息就看不了了,怎么办呢?
咱们之前在学习spark的时候其实也遇到过这种问题,当时是通过启动spark的historyserver进程解决
的。
flink也有historyserver进程,也是可以解决这个问题的。
historyserver进程可以在任意一台机器上启动,在这我们选择在bigdata04机器上启动
在启动historyserver进程之前,需要先修改bigdata04中的flink-conf.yaml配置文件

[root@bigdata04 flink-1.11.1]# vi conf/flink-conf.yaml
......
jobmanager.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
historyserver.web.address: 192.168.182.103
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
historyserver.archive.fs.refresh-interval: 10000
......
1
2
3
4
5
6
然后启动flink的historyserver进程
[root@bigdata04 flink-1.11.1]# bin/historyserver.sh start

验证进程
[root@bigdata04 flink-1.11.1]# jps
5894 HistoryServer
1
2
3
4
5
6
注意:hadoop集群中的historyserver进程也需要启动
在bigdata01、bigdata02、bigdata03节点上启动hadoop的historyserver进程

[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@bigdata02 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@bigdata03 hadoop-3.2.0]# bin/mapred --daemon start historyserver
1
此时Flink任务停止之后也是可以访问flink的web界面的。

本文标题:大数据开发工程师-第十六周 Flink极速上手篇-实战:流处理和批处理程序开发-2

文章作者:TTYONG

发布时间:2023年04月08日 - 01:04

最后更新:2023年04月20日 - 14:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%85%AD%E5%91%A8-Flink%E6%9E%81%E9%80%9F%E4%B8%8A%E6%89%8B%E7%AF%87-%E5%AE%9E%E6%88%98%EF%BC%9A%E6%B5%81%E5%A4%84%E7%90%86%E5%92%8C%E6%89%B9%E5%A4%84%E7%90%86%E7%A8%8B%E5%BA%8F%E5%BC%80%E5%8F%91-2.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%