1.环境说明
在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.7.2 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。有关开发环境的准备和集群的部署,请参考「开发环境搭建和应用的配置、部署及运行」的内容。
2.课程概要
如下图所示,Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。
在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。
3.Flink 客户端操作
3.1 Flink 命令行
Flink 的命令行参数很多,输入 flink - h 能看到完整的说明:
1 | flink-1.7.2 bin/flink -h |
如果想看某一个命令的参数,比如 Run 命令,输入:
1 | flink-1.7.2 bin/flink run -h |
本文主要讲解常见的一些操作,更详细的文档请参考: Flink 命令行官方文档。
3.1.1 Standalone
首先启动一个 Standalone 的集群:
1 | flink-1.7.2 bin/start-cluster.sh |
打开 http://127.0.0.1:8081 能看到 Web 界面。
Run
运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:
1 | flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar |
运行起来后默认是 1 个并发:
点左侧「Task Manager」,然后点「Stdout」能看到输出日志:
或者查看本地 Log 目录下的 *.out 文件:
List
查看任务列表:
1 | flink-1.7.2 bin/flink list -m 127.0.0.1:8081 |
Stop
停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。
1 | flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb |
从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。
1 | /** |
Cancel
取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。
1 | flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de |
也可以在停止的时候显示指定 Savepoint 目录。
1 | flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759 |
取消和停止(流作业)的区别如下:
- cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
- stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
Savepoint
触发 Savepoint。
1 | flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint |
说明:Savepoint 和 Checkpoint 的区别(详见文档):
- Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
- Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。
通过 -s 参数从指定的 Savepoint 启动:
1 | flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar |
查看 JobManager 的日志,能够看到类似这样的 Log:
1 | 2019-03-28 10:30:53,957 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator |
Modify
修改任务并行度。
为了方便演示,我们修改 conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录。(Modify 参数后面接 -s 指定 Savepoint 路径当前版本可能有 Bug,提示无法识别)
1 | taskmanager.numberOfTaskSlots: 4 |
修改参数后需要重启集群生效,然后再启动任务:
1 | flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh |
从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。
通过 Modify 命令依次将并发度修改为 4 和 3,可以看到每次 Modify 命令都会触发一次 Savepoint。
1 | flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa |
查看 JobManager 的日志,可以看到:
1 | 2019-06-17 09:05:11,179 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e () |
Info
Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。
1 | flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar |
拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/
可以和实际运行的物理执行计划对比:
3.1.2 Yarn per-job
单任务 Attach 模式
默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。
- 通过 -m yarn-cluster 指定 Yarn 模式
- Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。
- 客户端能看到结果输出
1 | [admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] |
如果我们以 Attach 模式运行 Streaming 的任务,客户端会一直等待不退出,可以运行以下的例子试验下:
1 | ./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar |
单任务 Detached 模式
- 由于是 Detached 模式,客户端提交完任务就退出了
- Yarn 上显示为 Flink per-job cluster
1 | $./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar |
3.1.3 Yarn session
启动 Session
1 | ./bin/yarn-session.sh -tm 2048 -s 3 |
表示启动一个 Yarn session 集群,每个 TM 的内存是 2 G,每个 TM 有 3 个 Slot。(注意:-n 参数不生效)
1 | flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3 |
客户端默认是 Attach 模式,不会退出:
- 可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183347_0726 连上来;
- 或者启动的时候用 -d 则为 detached 模式
Yarn 上显示为 Flink session cluster;
- 在本机的临时目录(有些机器是 /tmp 目录)下会生成一个文件:
1 | flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu |
提交任务
1 | ./bin/flink run ./examples/batch/WordCount.jar |
将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 Session。
1 | flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar |
运行结束后 TM 的资源会释放。
提交到指定的 Session
通过 -yid 参数来提交到指定的 Session。
1 | $./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar |
注:Blink版本 的 Session 与 Flink 的 Session 的区别:
- Flink 的 session -n 参数不生效,而且不会提前启动 TM;
- Blink 的 session 可以通过 -n 指定启动多少个 TM,而且 TM 会提前起来;
3.2 Scala Shell
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
3.2.1 Deploy
Local
1 | $bin/start-scala-shell.sh local |
任务运行说明:
- Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台;
- Streaming 任务内置了 senv 变量,通过 senv.execute(“job name”) 来提交任务,且 Datastream 的输出只有在 Local 模式下打印到控制台;
Remote
先启动一个 yarn session cluster:
1 | $./bin/yarn-session.sh -tm 2048 -s 3 |
启动 scala shell,连到 jm:
1 | $bin/start-scala-shell.sh remote z054.sqa.net 28665 |
Yarn
1 | $./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn |
按 CTRL + C 退出 Shell 后,这个 Flink cluster 还会继续运行,不会退出。
3.2.2 Execute
DataSet
1 | flink-1.7.2 bin/stop-cluster.sh |
对 DataSet 任务来说,print() 会触发任务的执行。
也可以将结果输出到文件(先删除 /tmp/out1,不然会报错同名文件已经存在),继续执行以下命令:
1 | scala> counts.writeAsText("/tmp/out1") |
查看 /tmp/out1 文件就能看到输出结果。
1 | flink-1.7.2 cat /tmp/out1 |
DataSteam
1 | scala> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--") |
对 DataStream 任务,print() 并不会触发任务的执行,需要显示调用 execute(“job name”) 才会执行任务。
TableAPI
在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用 btenv.sqlQuery 提交 SQL 查询),社区版本 Flink 1.8 会支持 TableAPI: https://issues.apache.org/jira/browse/FLINK-9555
3.3 SQL Client Beta
SQL Client 目前还只是测试版,处于开发阶段,只能用于 SQL 的原型验证,不推荐在生产环境使用。
3.3.1 基本用法
1 | flink-1.7.2 bin/start-cluster.sh |
Select 查询
1 | Flink SQL> SELECT 'Hello World'; |
按 ”Q” 退出这个界面
打开 http://127.0.0.1:8081 能看到这条 Select 语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink,且只输出一条结果。
注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。
Explain
Explain 命令可以查看 SQL 的执行计划。
1 | Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; |
3.3.2 结果展示
SQL Client 支持两种模式来维护并展示查询结果:
- table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;
1 | SET execution.result-mode=table |
- changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。
1 | SET execution.result-mode=changelog |
接下来通过实际的例子进行演示。
Table mode
1 | Flink SQL> SET execution.result-mode=table; |
运行结果如下图所示:
Changlog mode
1 | Flink SQL> SET execution.result-mode=changelog; |
运行结果如下图所示:
其中 ‘-’ 代表的就是撤回消息。
3.3.3 Environment Files
目前的 SQL Client 还不支持 DDL 语句,只能通过 yaml 文件的方式来定义 SQL 查询需要的表,UDF 和运行参数等信息。
首先,准备 env.yaml 和 input.csv 两个文件。
1 | flink-1.7.2 cat /tmp/env.yaml |
启动 SQL Client:
1 | flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml |
使用 insert into 写入结果表:
1 | Flink SQL> insert into MyTableSink select * from MyTableSource; |
查询生成的结果数据文件:
1 | flink-1.7.2 cat /tmp/output.csv |
也可以在 Environment 文件里面定义 UDF,在 SQL Client 里面通过 「HOW FUNCTIONS」查询和使用,这里就不再说明了。
SQL Client 功能社区还在开发中,详见 FLIP-24。
3.4 Restful API
接下来我们演示如何通过 Rest API 来提交 Jar 包和执行任务。
更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
1 | flink-1.7.2 curl http://127.0.0.1:8081/overview |
Restful API 还提供了很多监控和 Metrics 相关的功能,对于任务提交的操作也支持的比较全面。
3.5 Web
在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。
4.结束
本期的课程到这里就结束了,我们主要讲解了 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。
视频回顾:https://zh.ververica.com/developers/flink-training-course2/