原文翻译自 DZone,根据原文意译。
腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。
概述
两个最流行和发展最快的流处理框架是 Flink(自 2015 年以来)和 Kafka 的 Stream API(自 2016 年以来在 Kafka v0.10 中)。两者都是从 Apache 开源的,并迅速取代了 Spark Streaming——该领域的传统领导者。
在本文中,我将通过代码示例分享这两种流处理方法之间的主要区别。关于这个主题的文章很少涉及高级差异,例如[1]、[2]和[3],但通过代码示例提供的信息并不多。
在这篇文章中,我将解决一个简单的问题,并尝试在两个框架中提供代码并进行比较。在开始写代码之前,以下是我开始学习KStream 时的总结。
示例 1
以下是本示例中的步骤:
- 从 Kafka 主题中读取数字流。这些数字是由“[”和“]”包围的字符串产生的。所有记录都使用相同的 Key 生成。
- 定义5秒间隔的翻滚窗口。
- Reduce 操作(在数字到达时附加数字)。
- 打印到控制台。
Kafka Stream 代码
代码语言:txt复制static String TOPIC_IN = "Topic-IN";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.reduce((value1, value2) -> value1 value2)
.toStream()
.print(Printed.toSysOut());
Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Flink 代码
代码语言:txt复制static String TOPIC_IN = "Topic-IN";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.setStartFromLatest();
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.timeWindowAll(Time.seconds(5))
.reduce(new ReduceFunction<KafkaRecord>()
{
KafkaRecord result = new KafkaRecord();
@Override
public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception
{
result.key = record1.key;
result.value = record1.value record2.value;
return result;
}
})
.print();
System.out.println( env.getExecutionPlan() );
env.execute();
运行两者后观察到的差异
- 在 Kafka Stream 中在没有
groupByKey()
的情况下不能使用window()
; 而 Flink 提供了timeWindowAll()
可以在没有 Key 的情况下处理流中所有记录的方法。 - Kafka Stream 默认读取记录及其键,但 Flink 需要自定义实现
KafkaDeserializationSchema<T>
来读取 Key 和Value。如果您对 Key 不感兴趣,那么您可以将其new SimpleStringSchema()
用作FlinkKafkaConsumer<>
构造函数的第二个参数。我的MySchema
的实现可在 Github 上找到。 - 您可以打印两者的 pipeline 拓扑。这有助于优化您的代码。但是,除了 JSON 转储之外,Flink 还提供了一个 Web 应用程序来直观地查看拓扑 https://flink.apache.org/visualizer/。
- 在Kafka Stream中,我只能在调用
toStream()
后才能将结果打印到控制台,而 Flink 可以直接打印结果。 - 最后,Kafka Stream 花了 15 秒以上的时间将结果打印到控制台,而 Flink 是即时的。这对我来说看起来有点奇怪,因为它为开发人员增加了额外的延迟。
示例 2
以下是本例中的步骤
- 从 Kafka Topic 中读取数字流。这些数字是作为由“[”和“]”包围的字符串产生的。所有记录都使用相同的 Key 生成。
- 定义一个5秒的翻滚窗口。
- 定义 500 毫秒的延迟期以允许迟到。
- Reduce 操作(在数字到达时附加数字)。
- 将结果发送到另一个 Kafka Topic。
Kafka Stream 代码
代码语言:txt复制static String TOPIC_IN = "Topic-IN";
static String TOPIC_OUT = "Topic-OUT";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).grace(Duration.ofMillis(500)))
.reduce((value1, value2) -> value1 value2)
.toStream()
.to(TOPIC_OUT);
Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Flink 代码
代码语言:txt复制static String TOPIC_IN = "Topic-IN" ;
static String TOPIC_OUT = "Topic-OUT" ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>()
{
@Override
public long extractAscendingTimestamp(KafkaRecord record)
{
return record.timestamp;
}
});
// define kafka producer using Flink API.
KafkaSerializationSchema<String> serializationSchema = (value, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, value.getBytes());
FlinkKafkaProducer<String> kafkaProducer =
new FlinkKafkaProducer<String>(TOPIC_OUT,
serializationSchema,
prodProps,
Semantic.EXACTLY_ONCE);
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.keyBy(record -> record.key)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.milliseconds(500))
.reduce(new ReduceFunction<String>()
{
@Override
public String reduce(String value1, String value2) throws Exception
{
return value1 value2;
}
})
.addSink(kafkaProducer);
env.execute();
运行两者后观察到的差异
- 由于Kafka Stream 与 Kafka 的原生集成,所以在 KStream 中定义这个管道非常容易,Flink 相对来说复杂一点。
- 在 Flink 中,我不得不同时定义 Consumer 和 Producer,这就增加了额外的代码。
- KStream 自动使用记录中存在的时间戳(当它们被插入到 Kafka 中时),而 Flink 需要开发人员提供此信息。我认为未来可以改进 Flink 的 Kafka 连接器,以便开发人员可以编写更少的代码。
- KStream 比 Flink 更容易处理延迟到达,但请注意,Flink 还提供了延迟到达的侧输出流(Side Output),这是 Kafka 流中没有的。
- 最后,在运行两者之后,我观察到 Kafka Stream 需要额外的几秒钟来写入输出主题,而 Flink 在计算时间窗口结果的那一刻将数据发送到输出主题非常快。
结论
- 如果您的项目在源端和接收端都与 Kafka 紧密耦合,那么 KStream API 是更好的选择。但是,您需要管理和操作 KStream 应用程序的弹性。
- Flink 是一个完整的流式计算系统,支持 HA、容错、自监控和多种部署模式。
- 由于内置对多个第三方源的支持,并且 Sink Flink 对此类项目更有用。它可以轻松自定义以支持自定义数据源。
- 与 Kafka Stream 相比,Flink 拥有更丰富的 API,并支持批处理、复杂事件处理(CEP)、FlinkML 和 Gelly(用于图形处理)。