部署与测试
下载地址
https://archive.apache.org/dist/flink/
本次以Flink 1.9.3版本为例
运行 Flink 需要安装 Java 7.x 或更高的版本
代码语言:javascript复制java -version
运行
访问 Flink UI
Flink有个UI界面,可以用于监控Flilnk的job运行状态 http://localhost:8081/
测试
运行自带的 WordCount 示例
以统计 Flink 自带的 README.txt 文件为例。
命令:
代码语言:javascript复制D:
cd D:Toolsbigdataflink-1.9.3bin
.flink.bat run ..examplesbatchWordCount.jar -input ..README.txt -output ..README_CountWord_Result.txt
查看任务完成情况
http://localhost:8081/#/job/completed
编程开发
开发过程中并不需要在本地部署Flink环境。
创建项目
创建项目
项目名WordCount
在项目名称WordCount
上单击鼠标右键,在弹出的菜单中点击Add Framework Support
在java
目录上单击鼠标右键,在弹出的菜单中选择Refactor
,再在弹出的菜单中选择Rename
,
然后,在出现的界面中把java
目录名称修改为scala
。
添加类WordCount
在IDEA开发界面中,打开pom.xml,清空里面的内容,输入如下内容:
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.psvmc</groupId>
<artifactId>WordCount</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.9.3</flink.version>
<scala.version>2.12.15</scala.version>
</properties>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<dependencies>
<!-- flink核心API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
测试
创建测试文件wordcount.txt
D:bigdata_studystu_list.txt
代码语言:javascript复制10010,张三,女,16,IS
10011,李四,男,18,IS
10012,王五,男,19,IS
10013,赵六,女,15,CS
然后,再打开WordCount.scala代码文件,清空里面的内容,输入如下内容:
代码语言:javascript复制import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
case class Student(id: String, name: String, sex: String, age: Int, department: String)
def main(args: Array[String]): Unit = {
//设置用户名,避免权限错误
System.setProperty("HADOOP_USER_NAME", "hadoop");
val environment = ExecutionEnvironment.getExecutionEnvironment
val stu_list: DataSet[Student] = environment.readCsvFile[Student](
filePath = "file:///D:/bigdata_study/stu_list.txt",
lineDelimiter = "n",
fieldDelimiter = ",",
quoteCharacter = null,
ignoreFirstLine = false,
ignoreComments = "#",
lenient = false,
includedFields = Array[Int](0, 1, 2, 3, 4),
pojoFields = Array[String]("id", "name", "sex", "age", "department")
)
println("-------------原数据----------")
stu_list.print
}
}
运行就可以看到结果为
————-原数据———- Student(10012,王五,男,19,IS) Student(10010,张三,女,16,IS) Student(10013,赵六,女,15,CS) Student(10011,李四,男,18,IS)
打包运行
在IDEA开发界面的右侧,点击Maven
图标,会弹出Maven
调试界面
在Maven调试界面中点击package
,就可以对应用程序进行打包,打包成JAR包。
这时,到IDEA开发界面左侧的项目目录树中,在“target”目录下,就可以看到生成了两个JAR文件,
分别是:WordCount-1.0.jar
和WordCount-1.0-jar-with-dependencies.jar
。
语法
加载数据
代码语言:javascript复制import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
case class Student(id: String, name: String, sex: String, age: Int, department: String)
def main(args: Array[String]): Unit = {
//设置用户名,避免权限错误
System.setProperty("HADOOP_USER_NAME", "hadoop");
val env = ExecutionEnvironment.getExecutionEnvironment
val stu: DataSet[(Int, String, Double)] = env.fromElements(
(19, "Wilson", 178.8),
(17, "Edith", 168.8),
(18, "Joyce", 174.8),
(18, "May", 195.8),
(18, "Gloria", 182.7),
(21, "Jessie", 184.8)
)
stu.print
}
}
从文件中加载
代码语言:javascript复制import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
case class Student(id: String, name: String, sex: String, age: Int, department: String)
def main(args: Array[String]): Unit = {
//设置用户名,避免权限错误
System.setProperty("HADOOP_USER_NAME", "hadoop");
val environment = ExecutionEnvironment.getExecutionEnvironment
val stu_list: DataSet[Student] = environment.readCsvFile[Student](
filePath = "file:///D:/bigdata_study/stu_list.txt",
lineDelimiter = "n",
fieldDelimiter = ",",
quoteCharacter = null,
ignoreFirstLine = false,
ignoreComments = "#",
lenient = false,
includedFields = Array[Int](0, 1, 2, 3, 4),
pojoFields = Array[String]("id", "name", "sex", "age", "department")
)
println("-------------原数据----------")
stu_list.print
}
}
数据导出
导出到HDFS
代码语言:javascript复制//写入到HDFS
val output2 = "hdfs://bdedev/flink/Student002.csv"
ds2.writeAsCsv(output2, rowDelimiter = "n", fieldDelimiter = "|||", WriteMode.OVERWRITE)
env.execute()
导出到文件
代码语言:javascript复制//写入到文件
val output2 = "file:///D:/bigdata_study/result001.txt"
ds3.writeAsCsv(output2, rowDelimiter = "n", fieldDelimiter = ",", WriteMode.OVERWRITE)
env.execute()
值转换
Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。
单数据流基本转换
基于Key的分组转换
对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。
groupBy
会将一个DataSet
转化为一个GroupedDataSet
,聚合操作会将GroupedDataSet
转化为DataSet
。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。
aggregation
常见的聚合操作有sum
、max
、min
等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟groupBy
相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。
与批处理不同,这些聚合函数是对流数据进行数据,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum
算子的功能对该字段进行加和,并将结果保存在该字段上。min
操作无法确定其他字段的数值。
val tupleStream = env.fromElements(
(0, 0, 0), (0, 1, 1), (0, 2, 2),
(1, 0, 6), (1, 1, 7), (1, 2, 8)
)
tupleStream.groupBy(0).sum(1).print()
第0个分组,第1个求和
结果
(1,3,8) (0,3,2)
reduce
代码语言:javascript复制import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
def main(args: Array[String]): Unit = {
//设置用户名,避免权限错误
System.setProperty("HADOOP_USER_NAME", "hadoop");
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1: DataSet[String] = env.fromElements(
"good good study", "day day up"
)
val group_ds = ds1.flatMap(line => line.split(" ")).map(word => (word, 1)).groupBy(0)
val ds3 = group_ds.reduce((a, b) => (a._1, a._2 b._2))
ds3.sortPartition(0, Order.ASCENDING).print
}
}
结果
(up,1) (day,2) (good,2) (study,1)