官网:https://flink.apache.org/
一、Flink的重要特点
1)事件驱动型(Event-driven)
事件驱动的应用程序
是一个有状态的应用程序,它从一个或多个事件流接收事件,并通过触发计算、状态更新或外部操作对传入事件作出反应。事件驱动应用程序
是传统应用程序设计的一种发展,它具有分离的计算和数据存储层。在这种体系结构中,应用程序从远程事务数据库读取数据并将其持久化。- 相反,
事件驱动应用程序
基于有状态流处理应用程序。在这个设计中,数据和计算被放在同一个位置,从而产生本地(内存或磁盘)数据访问。容错是通过定期将检查点写入远程持久存储来实现的。下图描述了传统应用程序体系结构与事件驱动应用程序之间的区别。
kafka作为消息队列就是一种典型的事件驱动型应用。
2) 流、批(stream,micro-batching)
Spark
中,一切都是批次组成的,离线数据是一个大批次,实时数据是一个个无限的小批次组成的。Flink
中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
3)分层API
越顶层越抽象,最高层级的抽象是SQL。 越底层越具体
二、Flink使用(word count)
1、设置pom文件
注意下面的依赖设置,使用的是scala 2.12.x版本,Flink版本为1.10.1
xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.buildworld.flink</groupId>
<artifactId>FlinkTrain</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2、编写scala代码
1)批处理 wordcount
scala
代码语言:javascript复制package cn.buildworld.flink
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
// 批处理的word count
object WordCount {
def main(args: Array[String]): Unit = {
//创建一个批处理的执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val inputPath = "D:\Java\project\Scala\FlinkTrain\src\main\resources\hello.txt"
val dataSet: DataSet[String] = env.readTextFile(inputPath)
// 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
val resultDataSet: DataSet[(String, Int)] = dataSet
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0) //以第一个元素为key进行分组
.sum(1) //对所有数据的第二个元素求和
resultDataSet.print()
}
}
2)流处理wordcount
超级简单,比sparkstreaming的流式处理简单多了!!!
scala
代码语言:javascript复制import org.apache.flink.streaming.api.scala._
/**
* 流处理的word count
*
*/
object WordCountByStream {
def main(args: Array[String]): Unit = {
//创建一个批处理的执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置并行度
env.setParallelism(6)
//从端口中读取数据
val dataSet: DataStream[String] = env.socketTextStream("192.168.162.102", 7777)
// 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
val resultDataSet = dataSet
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0) //以第一个元素为key进行分组
.sum(1) //对所有数据的第二个元素求和
resultDataSet.print()
// 启动任务执行
env.execute()
}
}
补充
scala
代码语言:javascript复制import org.apache.flink.api.java.utils.ParameterTool
//可以冲启动参数里面读取指定的参数
val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = parameterTool.get("host")
val port: Int = parameterTool.getInt("port")