交互式编程环境:REPL
当前最著名的交互式编程环境莫属Jupyter Notebook了,程序员可以启动一个交互的Session,在这Session中编写代码、执行程序、获取结果,所见即所得。
交互式编程的优势包括:
- 快速启动一个程序:程序员不需要“编译、打包、执行”这一套复杂过程,只需要开启一个交互Session,敲下代码,直接得到结果,非常适合验证一段代码的结果。
- 直接获得程序反馈:使用
print
,可以在交互环境中直接得到程序结果,无需将输出导出到文件或其他位置。
程序员敲入几行程序命令,环境可以立刻给出反馈,因此这种交互式环境非常适合调试程序,尤其对于初学者来说非常友好。交互式编程环境通常被称为REPL(Read-Eval-Print Loop),这种方式跟Linux的命令行非常相似,因此又被称为Shell。支持REPL的语言有Python、R、Matlab、Scala以及Java 9。
我之前经常使用Spark的交互式环境spark-shell,Flink基于Java和Scala,其实也是支持交互式编程的,这里推荐新人使用REPL交互式环境来上手和学习Flink。
注意,Flink的交互式编程环境只支持Scala语言,程序员可以基于Scala语言调用DataStream/DataSet API、Table API & SQL,不支持Java。另外,Flink提供了Python版本的REPL环境,不过目前Flink(1.9)的Python API只支持Table API调用。本文主要展示Scala的REPL的使用方法。
启动REPL环境
下载Flink
前往Flink Downloads页面(https://flink.apache.org/downloads.html)下载编译好的Flink程序,这里根据你的Scala版本、是否需要搭载Hadoop环境等需求来选择适合的版本,没有特殊需求的选择最近版本的Flink即可。下载的tgz包解压缩后,在bin
目录下有一些Flink提供的基础工具。
注意,Flink目前对类Unix系统(Linux、macOS)比较友好,尽管Flink可以在Windows默认命令行上运行,但只限于一些基础命令,比如不支持REPL。Windows用户需要使用Cygwin、Windows Subsystem for Linux等方式来模拟Unix环境。
启动REPL
在命令行里进入Flink解压缩之后的目录,在本地启动一个Flink REPL交互式环境。
代码语言:javascript复制bin/start-scala-shell.sh local
启动后,命令行中会反馈一些注意信息:
验证一下Scala的Hello World:
代码语言:javascript复制scala > 打印(“ Hello World!”)
世界您好!
Scala Shell的使用
使用正确的运行环境
Flink Shell已经支持批处理和流处理两种模式。如上图所示,Flink在这个交互环境中默认提供运行环境Execution Environment,其中批处理为benv
、流处理为senv
。Flink是一个运行在集群上的大数据系统,需要通过运行环境这个入口与集群交互,因此运行环境是Flink程序必不可少的重要元素。
下面演示使用Scala Shell来运行一个最基础的map算子:
代码语言:javascript复制scala > val dataStream:DataStream [ Int ] = senv。fromElements(1,2,- 3,0,5,- 9,8)
dataStream:组织。阿帕奇。flink。流。api。斯卡拉(Scala)。DataStream [ Int ] = org。阿帕奇。flink。流。api。斯卡拉(Scala)。数据流@ 3013e1e8
scala > val lambda = dataStream。map(输入 => 输入 * 2)。列印()
lambda:组织。阿帕奇。flink。流。api。数据流。DataStreamSink [ Int ] = org。阿帕奇。flink。流。api。数据流。DataStreamSink @ 4ff00844
斯卡拉> SENV。执行(“基本地图转换”)
2
4
- 6
0
10
- 18
16
res0:组织。阿帕奇。flink。api。普通的。JobExecutionResult = org。阿帕奇。flink。api。普通的。JobExecutionResult @ 7f59f4e4
我创建了一个数字列表DataStream,然后使用map对每个元素乘以2,并打印出来。
注意,在流处理模式下,print不会自动触发,必须调用execute
才能触发执行前面的程序。
代码拷贝
我们经常遇到的一个使用场景是从网上看到一些代码片段,需要拷贝过来验证正确性。在Scala Shell中,可以使用:paste
命令进入拷贝模式,复制粘贴之后,再使用Control D
按键组合退出粘贴模式。
斯卡拉>:粘贴
//进入粘贴模式(按Ctrl-D完成)
val textStreaming = senv。fromElements(
“成为或不成为-这是一个问题:-”,
“心中难免会受苦”,
“吊索和离谱财富的箭”,
“或采取行动抵御麻烦之海,”)
^ D
//退出粘贴模式,现在正在解释。
textStreaming:组织。阿帕奇。flink。流。api。斯卡拉(Scala)。DataStream [ String ] = org。阿帕奇。flink。流。api。斯卡拉(Scala)。数据流@ 62e8ef9f
使用其他依赖
如果程序依赖了其他包,可以在启动Flink Scala Shell时,加上参数-a <path/to/jar>
或--addclasspath <path/to/jar>
。
例如,我想使用Gson来解析json数据:
代码语言:javascript复制bin / start-scala-shell.sh本地-a /Users/luweizheng/.m2/repository/com/google/code/gson/gson/2.8.5/gson-2.8.5.jar
这样我就能在交互式环境中使用这个包下的各种类和方法了。
绝大多数情况下,我们可能要依赖多个不同的包,这时候需要使用maven-shade-plugin
工具将所依赖包合并到一起,打成一个超级包(uber-jar),超级包内包含了这个程序所有必备的依赖。
使用Flink
Flink Scala Shell也支持扩展模式,包括独立的Flink集成和与其他应用程序共享的纱线实现。
远程链接
使用remote模式,指定JobManager的机器名(IP)和端口号:
代码语言:javascript复制bin / start-scala-shell.sh远程<主机名> <端口号>
纱线
使用这个命令可以在Yarn上部署一个新的Flink集群,并使用其他参数来配置集群信息,比如`-n 2将申请2个TaskManager,其他详细使用方法可以参见下面完整使用手册。
代码语言:javascript复制
bin / start-scala-shell.sh yarn -n 2
完整使用方法
代码语言:javascript复制Flink Scala壳
用法:start-scala-shell.sh [本地|远程|纱线] [选项] <args> ...
命令:本地[选项]
使用本地Flink集群启动Flink Scala Shell
-a <路径/到/罐子> | --addclasspath <路径/到/罐子>
指定在 Flink中使用的其他jar
命令:远程[选项] <主机> <端口>
启动Flink Scala Shell连接到远程集群
<主机>
远程主机名作为字符串
<端口>
远程端口为整数
-a <路径/到/罐子> | --addclasspath <路径/到/罐子>
指定在 Flink中使用的其他jar
命令:yarn [options]
启动Flink Scala外壳连接到纱线簇
-n arg | -容器 arg
要分配的YARN容器数(= TaskManagers数)
-jm arg | --jobManagerMemory arg
存储器为具有可选的单元JobManager容器(默认值:MB)
-nm <值> | --name <值>
为 YARN上的应用程序 设置自定义名称
-qu <arg> | --queue <arg>
指定YARN队列
-s <arg> | --slots <arg>
每个TaskManager的插槽数
-tm <arg> | --taskManagerMemory <arg>
每个TaskManager容器的内存,带可选单位(默认值:MB)
-a <路径/到/罐子> | --addclasspath <路径/到/罐子>
指定在 Flink中使用的其他jar
--configDir <值>
配置目录。
-h | - 救命
打印此用法文本