前言
Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。
Flink环境搭建
1. 环境准备
Flink支持在Linux、MacOS和Windows三大平台上部署。本文以Linux环境为例。
需要的软件依赖如下:
- JDK 8或以上版本
- Maven 3.5
- Flink 1.14.5版本
# 安装JDK
yum install -y java-1.8.0-openjdk-devel
# 安装Maven
yum install -y maven
接着下载Flink压缩包进行解压:
代码语言:javascript复制wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
tar -xvf flink-1.14.5-bin-scala_2.12.tgz
2.单机模式运行Flink
单机模式下,JobManager和TaskManager均运行在同一台机器上。
代码语言:javascript复制# 启动JobManager
./bin/start-cluster.sh
# 提交并运行WordCount程序
./bin/flink run examples/streaming/WordCount.jar
本文以单机模式为例进行讲解。实际生产环境中,建议部署在集群模式下运行。
3. 分布式集群模式
在集群模式下,JobManager和TaskManager会部署在不同节点上。
- 首先在一台机器上启动ResourceManager
- 在其他Worker节点上启动TaskManager
- 提交Job到JobManager进行调度和运行
以此实现Flink在分布式环境下高可靠且高性能的计算。
4. 编写WordCount程序
WordCount是一个流式WordCount程序,读取文本源头,以单词为单位进行计数统计。
代码语言:javascript复制// 定义文本源DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999);
//将每行内容切分转换成单词列表
DataStream<String> words = text
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] split = value.toLowerCase().split("\W ");
// ...
}
});
//按单词进行计数统计
DataStream<Tuple2<String, Long>> counts = words
.keyBy(value -> value)
.sum(1);
//输出结果
counts.print();
5. 运行和结果
编译打包项目,使用FlinkClient提交Job:
代码语言:javascript复制mvn clean package
bin/flink run target/wordcount-1.0-SNAPSHOT.jar
运行程序,使用netcat工具发送输入字符串,可以实时看到统计结果:
代码语言:javascript复制nc localhost 9999
hello world bye
hello again
6.代码示例
这里提供一个完整的WordCount流处理程序代码示例:
代码语言:javascript复制// 导入Flink相关包
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件读取文本行数据源
DataStream<String> text = env.addSource(new MySourceFunction());
// 将每行内容切分成单词
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] splits = value.split("\s ");
for (String word : splits) {
out.collect(word);
}
}
});
// 按单词进行分组计数
DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
.timeWindow(Time.seconds(5))
.sum(1);
// 打印最终结果
result.print();
// 执行任务
env.execute("WordCount");
}
// 自定义文本数据源
public static class MySourceFunction implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 从文件或集合读取文本
// ...
ctx.collect("hello world");
}
@Override
public void cancel() {
}
}
}
该示例从文件读取文本行,进行词频统计,并以对象流的方式输出结果。希望能给您一个完整代码实例的参考!
Flink与Yarn集成
Flink可以利用Yarn资源管理器来管理和调度Flink作业的执行。主要有以下步骤:
1. 安装和配置Yarn
安装Hadoop并配置Yarn资源管理器。
2. 配置Flink支持Yarn
修改flink-conf.yaml配置文件,添加如下配置:
代码语言:javascript复制yarn.distributed.enabled: true
3. 打包Flink项目为Yarn应用
代码语言:javascript复制mvn package -Pyarn
4. 提交Flink作业到Yarn
代码语言:javascript复制./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar
-m 参数指定使用Yarn作为资源管理器,-yn -ys 分配给任务的Container数量。
5. Yarn WebUI监控作业
可以在Yarn ResourceManager WebUI中查看和监控Flink作业状态。
6. 停止和重启作业
使用Flink Cli同样可以停止和重启在Yarn上运行的作业。
与此同时,Yarn也能根据负载自动扩缩容Flink作业上的Container数量。这样实现了Flink与Yarn的良好集成。
通过上述步骤就可以利用Yarn的资源管理能力来管理Flink分布式作业的执行了。
Flink通过时间窗口操作sql
Flink通过Table API和SQL来支持时间窗口的操作。
下面通过一个例子来说明:
1. 定义数据源
导入Flink的TableEnvironment:
代码语言:javascript复制TableEnvironment tableEnv = TableEnvironment.create(env);
从Kafka读取数据注册成Table:
代码语言:javascript复制tableEnv.connect(new FlinkKafkaConsumer<>(...)
.property(...));
2. 定义表结构
使用DDL定义Table结构:
代码语言:javascript复制CREATE TABLE inputTable (
id STRING,
timestamp TIMESTAMP,
...)
WITH (...);
3. 定义窗口
使用TUMBLE或HOP动态时间窗口
代码语言:javascript复制SELECT
id,
COUNT(*)
FROM
inputTable
GROUP BY
TUMBLE(timestamp, INTERVAL '5' MINUTE)
4. 窗口转换
支持窗口函数如SUM、COUNT、MAX等聚合计算:
代码语言:javascript复制SELECT
SUM(amount)
FROM
inputTable
GROUP BY
HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)
5. 输出结果
将结果输出到Kafka或打印:
代码语言:javascript复制tableEnv.toRetractStream[Row]...
通过Table API和SQL的时间窗口支持,可以更高效地操作和处理时间序列数据流。开发者可以使用熟悉的SQL语法进行流处理。
6. sql任务代码示例
这里提供一个完整的使用SQL实现单词计数的示例:
代码语言:javascript复制// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env);
// 从Kafka读取文本行数据
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.topic("kafka_topic"))
.withFormat(new SimpleStringSchema())
.createTemporaryTable("lines");
// 分词表
tableEnv.executeSql(
"CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS "
"SELECT "
" ROW_NUMBER() OVER() AS id, "
" word "
"FROM lines, LATERAL(FLATTEN(SPLIT(lines, ' ')))";
// 窗口聚合表
tableEnv.executeSql(
"CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS "
"SELECT "
" word, "
" COUNT(*) AS count "
"FROM words "
"GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");
// 输出结果
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");
// 执行程序
env.execute();
这个完整示例包含数据输入、分词、窗口聚合和结果输出的全流程SQL定义。希望对您理解SQL实现流处理过程有帮助。
时间窗口说明
1. 滚动窗口
- 滚动窗口分为定长窗口(TUMBLE)和滑动窗口(HOP)两种。
- 定长窗口将事件锁定到连续的固定大小时间窗口中,窗口不重合。
- 滑动窗口以固定时间间隔滑动,窗口重合部分可重复计算。
2. 窗口分配
- 每条事件根据时间戳分配到对应的窗口份组中。
- 窗口分配采用窗函数TIMESTAMP_WINDOW(timeField,窗口大小)实现。
3. 窗口聚合
- 事件分配完毕后,对每个窗口执行聚合操作(如COUNT、SUM等)。
- 窗口会将中间结果保存在状态后端(如RocksDB)。
4. 窗口结果输出
- 窗口被关闭时(到期),将最终结果输出。
- 也可以提前输出或定期输出中间结果。
5. 状态管理
- 窗口状态会进行快照保存,实现断点续传重启能力。
- 状态由KeyedStateBackend管理,比如RocksDB。
所以Flink时间窗口的原理就是:根据时间戳分配事件到窗口,窗口聚合操作更新状态,窗口关闭时输出结果。它独立于算子,为流处理引入了时间的概念。
6. 同批次时间窗口处理逻辑
如果一次从Kafka拉取的数据中,有一半的数据在当前时间窗口内,一半在窗口外,Flink会进行如下处理:
- 先根据事件时间戳,将数据分配到对应的时间窗口分区组(keyed state)中。
- 对每个时间窗口分区组单独处理:
- 时间窗口内的数据按正常流程进行聚合计算。
- 时间窗口外的数据不会参与当前窗口的聚合,但是会加入该key的back pressure。
- 窗口结果输出时:
- 只输出当前窗口已经关闭的分区组的结果。其他分区组处于开启状态,不会输出。
- 周期性检查窗口状态:
- 关闭那些超出时间范围的过期窗口。
- 对还未到期的窗口继续累积状态,待到期后输出结果。
所以Flink可以正确区分时间窗口内外的数据:
- 窗口内数据参与当前窗口计算
- 窗口外数据加入back pressure,未来窗口处理
- 只输出实际到期窗口的结果
这样保证了时间正确性,不会导致窗口结果计算错误