记录一下工作中可能用的到的FlinkAPI:
4.6Kafka Source
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/index.html
4.6.1API及其版本
Flink 里已经提供了一些绑定的 Connector,例如 Kafka Source 和 Sink,Elasticsearch Sink 等。读写 Kafka、ES、RabbitMQ 时可以直接使用相应 connector 的 API 即可,虽然该部分是Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
4.6.2参数设置
以下参数都必须/建议设置1.订阅的主题:topic 2.反序列化规则:deserialization 3.消费者属性-集群地址:bootstrap.servers 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理):groupId 5.消费者属性-offset重置规则,如earliest/latest…:offset 6.动态分区检测:dynamic partition detection
4.6.3Kafka命令 启动Kafka和Zookeeper命令,针对讲师提供虚拟机:
代码语言:javascript复制zookeeper-daemon.sh start
kafka-daemon.sh start
●查看当前服务器中的所有topic
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092
●创建topic
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --create --topic flink-topic
--bootstrap-server node1.itcast.cn:9092 --replication-factor 1 --partitions 3
●查看某个Topic的详情
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --describe --topic flink-topic
--bootstrap-server node1.itcast.cn:9092
●删除topic
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --delete --topic flink-topic
--bootstrap-server node1.itcast.cn:9092
●发送消息
代码语言:javascript复制/export/server/kafka/bin/kafka-console-producer.sh --topic flink-topic
--broker-list node1.itcast.cn:9092
●消费消息
代码语言:javascript复制/export/server/kafka/bin/kafka-console-consumer.sh --topic flink-topic
--bootstrap-server node1.itcast.cn:9092 --from-beginning
●修改分区
代码语言:javascript复制/export/server/kafka/bin/kafka-topics.sh --alter --topic flink-topic
--bootstrap-server node1.itcast.cn:9092 --partitions 4
4.6.4代码实现 Flink 实时从Kafka消费数据,底层调用Kafka New Consumer API,演示案例代码如下:
代码语言:javascript复制package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs; import java.util.Properties;
public class StreamSourceKafkaDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;
// c. 添加数据源
DataStreamSource<String> kafkaDataStream = env.addSource(kafkaConsumer);
// 3. 数 据 终 端 -sink kafkaDataStream.printToErr();
// 4. 触 发 执 行 -execute env.execute(StreamSourceKafkaDemo.class.getSimpleName()) ;
}
}
4.6.5Kafka 消费起始位置 Kafka 可以被看成一个无限的流,里面的流数据是短暂存在的,如果不消费,消息就过期滚动没了。涉及一个问题:如果开始消费,就要定一下从什么位置开始。
第一、earliest:从最起始位置开始消费,当然不一定是从0开始,因为如果数据过期就清掉了,所以可以理解为从现存的数据里最小位置开始消费; 第二、latest:从最末位置开始消费; 第三、per-partition assignment:对每个分区都指定一个offset,再从offset位置开始消费;
默认情况下,从Kafka消费数据时,采用的是:latest,最新偏移量开始消费数据。 在Flink Kafka Consumer 库中,允许用户配置从每个分区的哪个位置position开始消费数 据,具体说明如下所示: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
在代码中设置消费数据起始位置相关API如下所示:
案例演示代码:
代码语言:javascript复制package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.clients.CommonClientConfigs;
import java.util.HashMap; import java.util.Map;
import java.util.Properties;
public class StreamSourceKafkaOffsetDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;
package cn.itcast.flink.source.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.kafka.clients.CommonClientConfigs;
import java.util.HashMap; import java.util.Map;
import java.util.Properties;
public class StreamSourceKafkaOffsetDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3) ;
// 2. 数据源-source:从Kafka 消费数据
// a. Kafka Consumer消费者配置属性设置Properties props = new Properties() ;
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092"); props.setProperty("group.id", "test-1001");
// b. 创建FlinkKafkaConsumer对象
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>( "flink-topic", // Topic 名称
new SimpleStringSchema(), // props //
) ;
注意:开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。
4.6.6Kafka 分区发现 实际的生产环境中可能有这样一些需求,比如: 场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。
场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 6 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 6 个扩容到 12。该情况下如何在不重启作业情况下动态感知新扩容的 partition?
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka- consumers-topic-and-partition-discovery
针对上面的两种场景,首先在构建 FlinkKafkaConsumer 时的 properties 中设置flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,及设置的时间间隔。此时FlinkKafkaConsumer内部会启动一个单独的线程定期去Kafka获取最新的meta信息。 针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。