第十六周 Flink极速上手篇-实战:流处理和批处理程序开发-2
Flink快速上手使用
1 | 创建maven项目,因为要使用scala编写代码,在src里main里除了java目录,还要创建scala目录,再创建包 |

1 | 接下来在pom.xml 中引入flink相关依赖,前面两个是针对java代码的,后面两个是针对scala代码的,最后一个依赖是这对flink1.11这个版本需要添加的 |
1 | <dependency> |
Flink Job开发步骤
1 | 在开发Flink程序之前,我们先来梳理一下开发一个Flink程序的步骤 |
Streaming WordCount
1 | 需求 |
scala
1 | package com.imooc.scala |
1 | 注意:在idea等开发工具里面运行代码的时候需要把pom.xml中的scope配置注释掉 |

1 | 在bigdata04上面开启socket |

1 | 注意:此时代码执行的时候下面会显示一些红色的log4j的警告信息,提示缺少相关依赖和配置 |

1 | 此时再执行就没有红色的警告信息了,但是使用info日志级别打印的信息太多了,所以将log4j中的日志级别配置改为error级别 |

java
1 | package com.imooc.java; |
Batch WordCount
1 | 需求:统计指定文件中单词出现的总次数 |
scala
1 | package com.imooc.scala |
1 | 注意:这里面执行setParallelism(1)设置并行度为1是为了将所有数据写到一个文件里面,我们查看结果的时候比较方便(此时输出路径会变成一个文件) |
1 | 在pom.xml文件中增加 |

java
1 | package com.imooc.java; |
Flink Streaming和Batch的区别
1 | 流处理Streaming |
Flink集群安装部署
1 | Flink支持多种安装部署方式 |
standalone
1 | 下面我们来看一下standalone模式 |

1 | 依赖环境 |

1 | 由于目前Flink各个版本之间差异比较大,属于快速迭代阶段,所以在这我们就使用最新版本了,使用Flink1.11.1版本。 |
1 | 下面开始安装Flink集群 |
1 | 3:将修改完配置的flink目录拷贝到其它两个从节点 |
1 | 5:验证一下进程 |
1 | 在bigdata02上执行jps |
1 | 6:访问Flink的web界面 |
Standalone集群核心参数
1 | 参数 解释 |
1 | 1:slot是静态的概念,是指taskmanager具有的并发执行能力 |
Flink ON YARN
1 | Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。 |
Flink ON YARN两种使用方式

1 | 在工作中建议使用第二种方式。 |
Flink ON YARN第一种方式
1 | 下面来看一下第一种方式 |
1 | 下面来具体演示一下 |

1 | 接下来,使用yarn-session.sh在YARN中创建一个长时间运行的Flink集群 |
1 | 这个表示创建一个Flink集群,-jm是指定主节点的内存,-tm是指定从节点的内存,-d是表示把这个进程放到后台去执行。 |

1 | 此时到YARN的web界面中确实可以看到这个flink集群。 |


1 | 可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面的 |

1 | 接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例 |
1 | 注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应的applicationid。 |


1 | 任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。 |

1 | 注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。 |
1 | 最后把这个Flink集群停掉,使用yarn的kill命令 |
1 | 针对yarn-session命令,它后面还支持一些其它参数,可以在后面传一个-help参数 |

1 | 注意:这里的-j是指定Flink任务的jar包,此参数可以省略不写也可以 |
Flink ON YARN第二种方式
1 | flink run -m yarn-cluster(创建Flink集群+提交任务) |

1 | 针对Flink命令的一些用法汇总 |

Flink ON YARN的好处
1 | 1:提高大数据集群机器的利用率 |
向集群中提交Flink任务
1 | 接下来我们希望把前面我们自己开发的Flink任务提交到集群上面,在这我就使用flink on yarn的第二种方式来向集群提交一个Flink任务。 |
1 | <build> |
1 | 注意:需要将Flink和Hadoop的相关依赖的score属性设置为provided,这些依赖不需要打进jar包里面。 |
1 | 第二步:生成jar包: mvn clean package -DskipTests |
1 | 此时到yarn上面可以看到确实新增了一个任务,点击进去可以看到flink的web界面 |

1 | 通过socket输入一串内容 |






1 | 接下来我们希望把这个任务停掉,因为这个任务是一个流处理的任务,提交成功之后,它会一直运行。 |

停止任务
1 | 此时如果想要停止Flink任务,有两种方式 |
1 | 2:停止flink任务 |

1 | 或者 |

1 | 这个flink任务停止之后,对应的那个yarn-session(Flink集群)也就停止了。 |
1 | 然后启动flink的historyserver进程 |
1 | 注意:hadoop集群中的historyserver进程也需要启动 |
1 | 此时Flink任务停止之后也是可以访问flink的web界面的。 |