目录
背景
手把手环境搭建
- Flink安装
- Kafka安装
- HBase安装
一个Flink程序串起来的知识点
- Kafka Producer生产者
- 为Flink运行准备Producer消息流
- Flink访问Kafka,持久化到HBase
- 流式运行环境变量ExecutionEnvironment设置
- CheckPoint机制与参数设置
- FlinkKafka消费者参数设置
- 常用流式处理Operator算子
- Window窗口详解
- Watermark水印详解
- 写入HBase Sink实现
总结
背景
ApacheFlink是一个框架和分布式处理引擎,用于在无限和有界数据流上进行有状态计算。Flink被设计成在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
本篇文章从实用性入手,从Kafka消息系统获取消息,经过Flink解析计算,并将计算结果储存到HBase场景为例子。
首先从Kafka、Flink、HBase环境的手把手安装;再到Kafka生产者Producer程序实现及参数讲解,为Flink引擎计算准备消息数据源;再到Flink Table API和SQL及DataStream各自执行环境变量设置;再到Flink CheckPoint机制与Spark Streaming CheckPoint机制比较及原理讲解;再到FlinkConsumer参数设置讲解;再到常用map、flatMap,coGroup、Join等算子的讲解与应用;窗口Window与水印Watermark种类和机制的实例讲解;最后到写入Hbase的Sink实现,整个程序生命周期把Flink常用的大部分知识点全覆盖串联讲解。
这也是笔者关于Flink优化器原理与源码解析系列文章,此篇文章内容将多,希望有个好的开端。之后会进入Flink优化器、Flink SQL和Table API实现、Flink亮点功能的源码解析。
手把手环境准备
本篇文章环境准备都是基于Mac本地为伪分布式环境,Flink版本1.9.1、Kafka2.3.1、HBase1.3.5,这些本地安装较为简单,最重要的是能使读者较快上手,并通过一个可执行的例子把Flink知识点串起来。
- Flink安装
1. Mac本地Flink安装步骤:
1) java -version
查看JDK版本,最低1.8,还没有安装jdk的童鞋,请自行谷歌搜索安装步 骤,这不再赘述。
2) brew install apache-flink
安装Flink目前是版本1.9.1,执行这个命令后,坐等安装完整即可。
3) ./bin/start-cluster.sh
2. 启动脚本:
./usr/local/Cellar/apache-flink/1.9.1/libexec/bin/start-cluster.sh
3. 本地集群访问界面:
http://localhost:8081
- Kafka安装
1. Mac本地Kafka安装步骤:
1)brew install kafka
安装命令,当前Kafka版本2.3.1,查看安装目 录/usr/local/Cellar/kafka/2.3.1/libexec/bin
2)启动zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
3)启动kafka
kafka-server-start /usr/local/etc/kafka/server.properties &
4)创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic houyi
如创建一个名为“houyi”的topic,单个分区和只有一个副本
5)查看topic
kafka-topics --list --zookeeper localhost:2181
6)发送消息
sh kafka-console-producer.sh --broker-list localhost:9092 --topic houyi
Kafka提供了一个命令行客户端,它将从文件或标准输入接收输入,并将其作 为消息发送到Kafka集群。默认情况下,每行都将作为单独的消息发送。运行生产者,然后在控制台中键入一些消息发送到服务器。
7)消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic houyi --from-beginning
Kafka还有一个命令行消费者,将消息转储到标准输出。
- HBase安装
1. Mac本地安装HBase
1)安装命令:brew install hbase
配置HBase cd /usr/local/Cellar/hbase/1.3.5/libexec/conf
配置JAVA_HOMEvim hbase-env.sh添加export JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home"
export HBASE_MANAGES_ZK=true
2)在conf/hbase-site.xml设置HBase的核心配置
$ vim hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///usr/local/var/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/usr/local/var/zookeeper</value>
</property>
<property>
<name>hbase.zookeeper.dns.interface</name>
<value>lo0</value>
</property>
<property>
<name>hbase.regionserver.dns.interface</name>
<value>lo0</value>
</property>
<property>
<name>hbase.master.dns.interface</name>
<value>lo0</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
</configuration>
3)HBase的启动
cd /usr/local/Cellar/hbase/1.3.5/bin/start-hbase.shsh /usr/local/Cellar/hbase/1.3.5/bin/start-hbase.sh
4)启动hbase shell
cd /usr/local/Cellar/hbase/1.3.5/bin/hbase shell
5)创建表dqc_check_result,列族completeness
create 'dqc_check_result', 'completeness'
一个Flink程序串起来的知识点
- Kafka Producer生产者
为Flink消费准备消息流的功能,每毫米产生一个HH:mm:ss:SSS时间戳以timestamp为key的json字符串。
代码语言:javascript复制public class EKafkaProducer {
public static void main(String[] args) throws InterruptedException {
String topicName = "houyi";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer(props);
while(true){
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SSS");
SimpleDateFormat hourSDF = new SimpleDateFormat("HH");
String time = String.format("%s%s%s","{"timestamp":"",sdf.format(date),""}");
String hour = hourSDF.format(date);
producer.send(new ProducerRecord<>(topicName,hour,time));
Thread.sleep(1);
}
// producer.close();
}
}
acks参数
acks 参数指定了必须有多少个分区副本收到消息,才视为消息写入是成功。该参数选项如下:
- acks=0
Producer生产者在成功写入消息之前不会等待来自服务器的响应。其不能保证消息不会丢失,但因为生产者不需要等待服务器的响应,所以这种很高的吞吐量。
- acks=1
只要集群主节点收到消息,Producer生产者就会收到一个来自服务器的成功响应。如果消息无法到达主节点,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。吞吐量取决于使用的是同步发送还是异步发送。
- acks=all
只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。但是它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
buffer.memory参数
该参数用来设置Producer生产者内存缓冲区的大小,用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于max.block.ms参数的设置,表示在抛出异常之前可以阻塞的最长时间。
retries参数
Producer生产者从服务器收到的错误有可能是临时性的错误,如分区找不到主节点。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果超过这个次数阈值,则生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,也可以通过 retry.backoff.ms 参数来改变这个时间间隔。
batch.size参数
该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。如果批次大小设置过大,会占用更多的内存而已。如果设置得太小,因为生产者频繁地发送消息,会增加一些额外的开销。
linger.ms参数
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。
Producer生产者会在批次填满或 linger.ms 达到上限时把批次发送出去。默认地只要有可用的线程,就算批次里只有一个消息,生产者也会把消息发送出去。此参数是需要平衡延迟和吞吐量的。
- Flink访问Kafka,持久化到HBase程序
- 流式运行环境变量ExecutionEnvironment设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);//设置并行度
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置处理时间
关于运行环境变量还有其他类型:
- StreamExecutionEnvironment :流式运行环境变量。
如DataStream<String> stream = env.addSource(EKafkaConsumer);接入流式数据源
- ExecutionEnvironment:批式数据运行环境变量
DataSet数据程序是在数据集上实现转换的常规程序(例如filtering过滤、mapping映射、joining连接、grouping分组)。DataSet数据集是从某些确定的数据源(例如,通过读取文件或从本地集合)创建的。如本地集合创建的数据集
代码语言:javascript复制DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
- TableEnvironment:Table运行时环境,又分StreamTableEnvironment流查询运行环境和BatchTableEnvironment批查询运行环境。
- StreamTableEnvironment流查询运行环境:
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
代码语言:javascript复制
代码语言:javascript复制ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);、
Flink SQL 是基于 Apache Calcite 的实现的,Calcite 实现了 SQL 标准解析。SQL 查询是一个完整的 sql 字符串来查询。一条 stream/batch sql 从提交到 calcite 解析、验证、优化到物理执行计划再到Flink 引擎执行,一般分为以下几个阶段:
1)Sql Parser: 将 sql 语句解析成一个逻辑树, 在 calcite 中用 SqlNode 表示逻辑树 ;
2)Sql Validator: 结合 catalog 去验证 sql 语法;
3)生成 Logical Plan: 将 sqlNode 表示的逻辑树转换成 Logical Plan, 用 relNode 表示 ;
4)生成 optimized Logical Plan: 先基于 calcite rules 去优化 logical Plan,
5)基于 Flink 定制的一些优化 规则rules 去优化 logical Plan;
6)生成 Flink Physical Plan基于 Flink 的 优化规则rules ,将 optimized Logical Plan 转成成 Flink 的物理执行计划;
7)将物理执行计划转成 Flink Execution Plan调用相应的 tanslateToPlan 方法转换。
Table API的也是基于Calcite实现的,除了无SQL转换为SqlNode这一步,整个过程非常类似。关于优化器优化Rule规则,可参考笔者之前的写的,Hive优化器原理与源码解析系列关于优化规则的文章,文末有讲解。
- 运行环境变量设置的并行度:
env.setParallelism(4)
ExecutionEnvironment变量setParallelism方法设置的并行度。
可给operators、data sources、data sinks设置统一的默认的parallelism并行度,即如果都没设置并行度,则默认使用ExecutionEnvironment变量设置的并行度。如果operators、data sources、data sinks自己有设置parallelism并行度,则会覆盖ExecutionEnvironment设置的并行度。
- 运行环境变量设置流时间特性:
- 事件时间(Event Time):即事件实际发生的时间。设置EventTime时间特性,必须指定如何生成 EventTime的水印,下面会讲到Watermark水印类型
- 处理时间(Processing Time):指事件被Flink处理时的系统时间。
- 进入时间(Ingestion Time):事件进入Flink流处理框架的时间。
如图:
- CheckPoint机制与参数设置
StateBackend stateBackend = new FsStateBackend("hdfs://tmp/hive/checkpoints");
env.setStateBackend(stateBackend);
CheckpointConfig config = env.getCheckpointConfig(); //设置CheckPoint配置
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//准确一次语义
config.setCheckpointInterval(60000);//设置checkPoint保存时间间隔
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置取消时,checkPoint处理方式
Flink CheckPoint机制
在讲Flink CheckPoint之前,先简单说下流引擎消费Kafka消息,将计算结果存放到Mysql场景,如何保证Exactly-Once的?
Spark CheckPoint实现:
仅存储Spark程序的状态CheckPoint将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方,但是不保证Kafka和Mysql的消费和存储准确一次的。就需要自己实现在实现消费Kafka端,需要手动提交偏移量。在持久化到Mysql端,需封装在一个事务算子内,并记录当前消费的偏移量。
Flink CheckPoint实现:
Flink 中实现的 Kafka 消费者是一个集成了CheckPoint机制的State Operator,保存了所有 Kafka 分区的读取偏移量。当一个checkPoint被触发时,每一个分区的偏移量都被存到了这个Checkpoint中。同样持久化Mysql的写入的偏移量也被保存到CheckPoint中的。它们存储的状态都是基于相同的输入数据。这样CheckPoint机制保证了所有 operator task 的存储状态都是一致的。当所有的 operator task 成功存储了它们的状态,一个CheckPoint才算完成。相当于把整个流程都封装成一个事务。从而保证excatly-once准确一次。
Flink CheckPoint机制是根据配置微流批地基于Stream中各个Operator的状态来生成Snapshot快照,从而将这些状态数据持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态不一致。
Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,
Barrier:
1)Barrier作为数据流的一部分随着记录被注入到数据流中。Barrier永远不会赶超通常的流记录,它会严格遵循顺序。
2)Barrier将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。
3)每一个Barrier携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。
4)Barrier不会中断流处理,因此非常轻量级。
这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
CheckPoint存储依赖StateBackend的实现,State状态在StateBackend存放有三种类型:
- MemoryStateBackend:
State Backend对State进行快照,作为CheckPoint发送到JobManager机器上作为Java对象保存在堆内,存储着key/value状态、window运算符、触发器等的哈希表。默认使用异步快照,避免阻塞管道MemoryStateBackend(MAX_MEM_STATE_SIZE, false),false表示禁用异步快照。默认地每个State大小限制不超过5MB,可以设置,但最大消息大小不能超过Akka的传输10MB。
- FsStateBackend:
使用可靠地文件存储系统State,如HDFS。
FsStateBackend将正在运行的数据保存在TaskManager的内存中。在CheckPoint时,它将State的快照写入文件系统对应的目录下的文件中。最小元数据存储在JobManager的内存中,高可用模式下,元数据存储在CheckPoint中。
FsStateBackend默认使用异步快照,避免阻塞处理的管道FsStateBackend(path,flase),false表示禁用异步快照,适用于具有大状态、长窗口大键值的高可用作业。
- RocksDBStateBackend:
使用RocksDB存储State。RocksDBStateBackend将正在运行的数据保存在RocksDB数据库中。该数据库存储在TaskManager数据目录中。
在CheckPoint时,整个RocksDB数据库将被CheckPoint带配置的文件系统对应的目录下。最小元数据存储在JobManager的内存中,RocksDBStateBackend始终执行异步快照。RocksDBStateBackend是目前唯一提供增量checkpoint的后端。
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//准确一次语义
Flink检查点支持的两种模式(CheckpointingMode
):
- EXACTLY_ONCE
准确一次,将对
operator
和udf
(user defined function
)进行快照:在恢复时,每条记录将在operator
状态中只被重现/重放一次。 - AT_LEAST_ONCE
至少一次,将以一种更简单地方式来对
operator
和udf
的状态进行快照:在失败后进行恢复时,在operator
的状态中,一些记录可能会被重放多次。
config.setCheckpointInterval(60000);
设置checkPoint保存时间间隔。为了降低 IO 成本,状态的保存必然是微批量(micro-batching)的而不是流式的,这会导致状态的保存总是落后于流计算进度,为了保证 exactly-once 流计算引擎实现了事务回滚。
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
CheckPoint设置取消时,checkPoint处理方式两种:
1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint。
2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint。
- FlinkKafka消费者参数设置
代码语言:javascript复制 List<String> topics = Arrays.asList("houyi","conf");
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("group.id", "test1");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("session.timeout.ms", "30000");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//反序列化
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
topics,
new SimpleStringSchema(),
props
);
- 常用流式处理算子讲解
Map算子
map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素,可直接使用也可实现MapFunction类
代码语言:javascript复制DataStream<String> printTime = stream.map(v->"现在的时间:" v);
printTime.print();//打印测试
FlatMap算子
flatmap可理解为将元素摊平,每个元素可以变为0个、1个或者多个元素,实现FlatMap类对line查分统计个数
代码语言:javascript复制DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print();
public static final class LineSplitter implements FlatMapFunction<String,Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\W ");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
Filter算子
进行筛选只输出满足条件的,可直接使用,也可实现FilterFunction
代码语言:javascript复制DataStream<String> filter = stream.filter(new FilterCond());
//FilterCond集成FilterFunction
public static final class FilterCond implements FilterFunction<String>{
@Override
public boolean filter(String s) throws Exception {
int min = Integer.valueOf(s.substring(s.length()-2));
if(min % 10 == 0){
return true;
}else {
return false;
}
}
}
Join算子
流tuple15和流tuple10做等值关联t1.f0 = t2.f0,类似Inner Join只会返回在滚动窗口内关联上的消息流
代码语言:javascript复制DataStream<String> joinStream11 = tuple15.join(tuple10)
.where(t1->t1.f0).equalTo(t2->t2.f0)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
.apply((t1, t2)->t1.f1 "," t2.f1);
joinStream11.print();
Cogroup算子
关联两个流,关联不上的也保留下来。流tuple15和流tuple10做等值关联t1.f0 = t2.f0,但Cogroup在指定的窗口内,不管是否关联上,都会返回。
代码语言:javascript复制DataStream<String> joinStream = tuple15.coGroup(tuple10)
.where(t1->t1.f0).equalTo(t2->t2.f0)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1)))
.apply((new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<String> collector) throws Exception {
for (Tuple2<String, String> item : iterable) {
collector.collect(item.f1);
}
for (Tuple2<String, String> item : iterable1) {
collector.collect(item.f1);
}
}
})).filter(new FilterMod10());
joinStream.print();
Union算子
union可以将多个流合并到一个流中对多个流的水平拼接,参与合并的流必须是同一种类型。和SQL中Union一致
代码语言:javascript复制DataStream<String> unionStream = filter.union(printTime,currentTime);//可同时合并多个流
unionStream.print();
- Window窗口
Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,可以在这些桶上做计算操作。
- Window 是支持时间和数据驱动的
- 以时间为单位的 Time Window,如:每 30 秒钟、每 1 分钟等
- 以数据的数量为单位的 Count Window,如:每 100 个元素
- 通用的窗口模型:
- 滚动窗口(tumbling window,没有重叠)
- 滑动窗口(sliding window,有重叠)
- 会话窗口(session window,中间有一个不活动的间隙)
滚动窗口
滚动窗口分配器元素分配给固定窗口大小的窗口,窗口内元素不重叠。如window size为30秒,则滚动窗口分配器会每30秒执行计算,并启动新的窗口,如图:
基于时间的滚动窗口
代码语言:javascript复制DataStream<Tuple2<String,Integer>> keyByWindow0 = stream.map(s -> Tuple2.of(s.substring(s.length()-1),1))
.returns(Types.TUPLE(Types.STRING,Types.INT))//如果要用Lambda表示是,Tuple2是泛型,得用returns指定类型。
.keyBy(0)//使用指定第0个字段,即第一列进行哈希分区
.timeWindow(Time.seconds(30))//每30秒
.sum(1);
keyByWindow0.print();
基于数据个数滚动窗口
代码语言:javascript复制DataStream<Tuple2<String,Integer>> keyByWindow2 = stream.map(s -> Tuple2.of(s.substring(s.length()-1),1))
.returns(Types.TUPLE(Types.STRING,Types.INT))//如果要用Lambda表示是,Tuple2是泛型,得用returns指定类型。
.keyBy(0)//使用指定第0个字段,即第一列进行哈希分区
.countWindow(100)//每100个元素
.sum(1);
keyByWindow2.print();
滑动窗口
滑动窗口分配器同样地将元素分配给固定窗口大小的窗口,窗口内元素有可能重叠,有窗口大小和滑动频率两个参数。如窗口大小60秒,启动频率为30秒。则每30秒计算一次过去60秒的元素。
基于时间的滑动窗口
代码语言:javascript复制DataStream<Tuple2<String,Integer>> keyByWindow1 = stream.map(s -> Tuple2.of(s.substring(s.length()-1),1))
.returns(Types.TUPLE(Types.STRING,Types.INT))//如果要用Lambda表示是,Tuple2是泛型,得用returns指定类型。
.keyBy(0)//使用指定第0个字段,即第一列进行哈希分区
.timeWindow(Time.seconds(60),Time.seconds(30))
.sum(1);
keyByWindow1.print();
基于数据个数的滑动窗口
代码语言:javascript复制DataStream<Tuple2<String,Integer>> keyByWindow3 = stream.map(s -> Tuple2.of(s.substring(s.length()-1),1))
.returns(Types.TUPLE(Types.STRING,Types.INT))//如果要用Lambda表示是,Tuple2是泛型,得用returns指定类型。
.keyBy(0)//使用指定第0个字段,即第一列进行哈希分区
.countWindow(100,10)//每10个元素,对过去100个元素进行计算
.sum(1);
keyByWindow3.print();
会话窗口
与滚动窗口和滑动窗口不同的是,会话窗口不会重叠, 也没有固定的开始和结束时间。分配器通过活动会话分组元素的,如窗口不活动长度超过了定义会话间隔,则关闭当前会话,后续到的元素被分配到新的会话窗口。
代码语言:javascript复制DataStream<Tuple2<String,Integer>> keyByWindow4 = stream.map(s -> Tuple2.of(s.substring(s.length()-1),1))
.returns(Types.TUPLE(Types.STRING,Types.INT))//如果要用Lambda表示是,Tuple2是泛型,得用returns指定类型。
.keyBy(0)//使用指定第0个字段,即第一列进行哈希分区
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.sum(1);
keyByWindow3.print();
- Watermark水印
Watermark是用于处理乱序事件的,是基于事件时间Event Time并结合Window窗口来实现。流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但网络、处理压力等问题可能导致事件乱序。但是对于late element,我们又不能无限期的等下去,得有机制来保证一个特定的时间后,必须触发window去进行计算了。这个机制就是watermark水印。
设置水印有两种接口:
1)AssignerWithPeriodicWatermarks:周期性水印,周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中周期性的方式必须结合时间周期或积累条数两种方式周期性产生Watermark。
代码语言:javascript复制DataStream<String> waterMarkAscending = stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
@Nullable
long currentMaxTimestamp = 0L;
long maxOutOfOrderness = 3000L;
Watermark watermark = null;
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(String s, long l) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SSS");
Date date = null;
try {
date=sdf.parse(new JSONObject(s).get("timestamp").toString());
} catch (ParseException e) {
e.printStackTrace();
}
long timeStamp = date.getTime();
currentMaxTimestamp = Math.max(timeStamp,currentMaxTimestamp);
return timeStamp;
}
});
2)AssignerWithPunctuatedWatermarks:间断性水印,数据流中每个递增的EventTime都会产生一个Watermark。因为在每个事件上生成水印。然而,由于每一个水印都会引起下游的一些计算,则过多的水印会降低性能。
代码语言:javascript复制 DataStream<MyEvent> wMark = stream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
/**
* checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线。
* 不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。
* @param element
* @param previousElementTimestamp
* @return
*/
@Nullable
@Override
public Watermark checkAndGetNextWatermark(MyEvent element, long previousElementTimestamp) {
return element.getSequenceTimestamp();
}
/**
* 其中extractTimestamp用于从消息中提取事件时间,
* @param lastElement
* @param extractedTimestamp
* @return
*/
@Override
public long extractTimestamp(MyEvent lastElement, long extractedTimestamp) {
return lastElement.isEndOfSequence() ? new Watermark(extractedTimestamp) : null;
}
});
上述接口部分实现对AssignerWithPeriodicWatermarks周期性接口实现的有两种抽象类:
1)BoundedOutOfOrdernessTimestampExtractor允许固定数量延迟的分配器
相对于上述实现AssignerWithPeriodicWatermark接口需extractTimestamp提起消息时间时间戳方法和getCurrentWatermark()获取当前水印的两个方法,其现在源码中,已经实现了getCurrentWatermark(),用户自需要自己实现extractTimestamp提起消息时间时间戳方法逻辑即可。
代码语言:javascript复制DataStream<String> waterMarkOutOfOrderness = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(10)) {
@Override
public long extractTimestamp(String s) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SSS");
Date date = null;
try {
date=sdf.parse(new JSONObject(s).get("timestamp").toString());
} catch (ParseException e) {
e.printStackTrace();
}
return date.getTime();//允许固定数量延迟的分配器,需要获取 String对象内实际时间戳
}
}).map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return new JSONObject(s).get("timestamp").toString();
}
});
2)AscendingTimestampExtractor递增时间戳分配器:
周期性水印生成最简单的特例是给定源任务看到的时间戳按升序出现的情况。在这种情况下,当前时间戳始终可以充当水印,因为不会到达较早的时间戳。请注意,只需要每个并行数据源任务的时间戳升序。例如,如果在特定设置中,一个并行数据源实例读取一个Kafka分区,则只需在每个Kafka分区内将时间戳升序。Flink的水印合并机制将在并行流被洗牌、联合、连接或合并时生成正确的水印
代码语言:javascript复制/**
A timestamp assigner and watermark generator for streams where timestamps are monotonously ascending.
In this case, the local watermarks for the streams are easy to generate, because they strictly follow the timestamps.
用于时间戳单调递增的流的时间戳分配器和水印生成器。在这种情况下,流的本地水印很容易生成,因为它们严格遵循时间戳。
*/
DataStream<String> ascendingTimeStamp = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss:SSS");
Date date = null;
try {
date=sdf.parse(new JSONObject(element).get("timestamp").toString());
} catch (ParseException e) {
e.printStackTrace();
}
long timeStamp = date.getTime();
return timeStamp;//返回时间本身的时间
}
});
- 写入HBase Sink实现
写入到HBase需要集成,需要继承RichSinkFunction抽象类
代码语言:javascript复制public abstract class RichSinkFunction<IN>
extends AbstractRichFunction
implements SinkFunction<IN>
A RichFunction
version of SinkFunction
.
抽象类继承需要实现open(),invoke(),close()方法。一般HBase连接及Table对象的获取由open方法实现,实际数据获取、逻辑处理、数据写入由invoke方法实现,访问完记得关闭连接信息close方法实现。
详细代码如下:
代码语言:javascript复制//实现HBase写入
public static final class HBaseSink extends RichSinkFunction<String> {
Connection conn;
Table table;
Configuration conf;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
conn = ConnectionFactory.createConnection(conf);
table = conn.getTable(TableName.valueOf("dqc_check_result"));
}
@Override
public void invoke(String value, Context context) {
Put put = new Put(value.getBytes());
put.addColumn("completeness".getBytes(), "is_null".getBytes(), value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
if (table != null) {
table.close();
}
if (conn != null) {
conn.close();
}
}
}
总结
本篇文章从Kafka消息系统获取消息,Flink解析计算,并将计算结果储存到HBase场景为例,把Flink的知识点进行串联起来进行扩展讲解的,但限于篇幅有限,有些内容没法进行源码级详细展开,Flink非常有特性亮点的知识点,笔者会单独另写文章进行解析。