Kafka Stream(KStream) vs Apache Flink

2021-11-28 11:26:48 浏览数 (1)

原文翻译自 DZone,根据原文意译。

腾讯云流计算 Oceanus 是大数据实时化分析利器,兼容 Apache Flink 应用程序。新用户可以 1 元购买流计算 Oceanus(Flink) 集群,欢迎读者们体验使用。

概述

两个最流行和发展最快的流处理框架是 Flink(自 2015 年以来)和 Kafka 的 Stream API(自 2016 年以来在 Kafka v0.10 中)。两者都是从 Apache 开源的,并迅速取代了 Spark Streaming——该领域的传统领导者。

在本文中,我将通过代码示例分享这两种流处理方法之间的主要区别。关于这个主题的文章很少涉及高级差异,例如[1]、[2]和[3],但通过代码示例提供的信息并不多。

在这篇文章中,我将解决一个简单的问题,并尝试在两个框架中提供代码并进行比较。在开始写代码之前,以下是我开始学习KStream 时的总结。

KStream vs FlinkKStream vs Flink

示例 1

以下是本示例中的步骤:

  1. 从 Kafka 主题中读取数字流。这些数字是由“[”和“]”包围的字符串产生的。所有记录都使用相同的 Key 生成。
  2. 定义5秒间隔的翻滚窗口。
  3. Reduce 操作(在数字到达时附加数字)。
  4. 打印到控制台。

Kafka Stream 代码

代码语言:txt复制
static String TOPIC_IN = "Topic-IN";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.reduce((value1, value2) -> value1   value2)
.toStream()
.print(Printed.toSysOut());
Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, props); 
streams.start();

Flink 代码

代码语言:txt复制
static String TOPIC_IN = "Topic-IN";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.setStartFromLatest();
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.timeWindowAll(Time.seconds(5))
.reduce(new ReduceFunction<KafkaRecord>() 
 {
   KafkaRecord result = new KafkaRecord();
   @Override
   public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception
   {
     result.key = record1.key;  
     result.value = record1.value   record2.value;      
     return result;
   }
})
.print();        
System.out.println( env.getExecutionPlan() );
env.execute();

运行两者后观察到的差异

  1. 在 Kafka Stream 中在没有 groupByKey()的情况下不能使用window(); 而 Flink 提供了timeWindowAll()可以在没有 Key 的情况下处理流中所有记录的方法。
  2. Kafka Stream 默认读取记录及其键,但 Flink 需要自定义实现KafkaDeserializationSchema<T>来读取 Key 和Value。如果您对 Key 不感兴趣,那么您可以将其new SimpleStringSchema()用作FlinkKafkaConsumer<>构造函数的第二个参数。我的MySchema的实现可在 Github 上找到。
  3. 您可以打印两者的 pipeline 拓扑。这有助于优化您的代码。但是,除了 JSON 转储之外,Flink 还提供了一个 Web 应用程序来直观地查看拓扑 https://flink.apache.org/visualizer/。
  4. 在Kafka Stream中,我只能在调用 toStream() 后才能将结果打印到控制台,而 Flink 可以直接打印结果。
  5. 最后,Kafka Stream 花了 15 秒以上的时间将结果打印到控制台,而 Flink 是即时的。这对我来说看起来有点奇怪,因为它为开发人员增加了额外的延迟。

示例 2

以下是本例中的步骤

  1. 从 Kafka Topic 中读取数字流。这些数字是作为由“[”和“]”包围的字符串产生的。所有记录都使用相同的 Key 生成。
  2. 定义一个5秒的翻滚窗口。
  3. 定义 500 毫秒的延迟期以允许迟到。
  4. Reduce 操作(在数字到达时附加数字)。
  5. 将结果发送到另一个 Kafka Topic。

Kafka Stream 代码

代码语言:txt复制
static String TOPIC_IN = "Topic-IN";
static String TOPIC_OUT = "Topic-OUT";
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream(TOPIC_IN, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).grace(Duration.ofMillis(500)))
.reduce((value1, value2) -> value1   value2)
.toStream()
.to(TOPIC_OUT);
            
Topology topology = builder.build();    
final KafkaStreams streams = new KafkaStreams(topology, props); 
streams.start();

Flink 代码

代码语言:txt复制
static  String  TOPIC_IN  =  "Topic-IN" ;
static  String  TOPIC_OUT  =  "Topic-OUT" ;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>() 
{
  @Override
  public long extractAscendingTimestamp(KafkaRecord record) 
  {
    return record.timestamp;
  }
});
// define kafka producer using Flink API.
KafkaSerializationSchema<String> serializationSchema = (value, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, value.getBytes());
FlinkKafkaProducer<String> kafkaProducer = 
                new FlinkKafkaProducer<String>(TOPIC_OUT, 
                                               serializationSchema, 
                                               prodProps, 
                                               Semantic.EXACTLY_ONCE);
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.keyBy(record -> record.key)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.milliseconds(500))  
.reduce(new ReduceFunction<String>()
{
  @Override
  public String reduce(String value1, String value2) throws Exception
  {
    return value1 value2;
  }
})
.addSink(kafkaProducer);
env.execute();

运行两者后观察到的差异

  1. 由于Kafka Stream 与 Kafka 的原生集成,所以在 KStream 中定义这个管道非常容易,Flink 相对来说复杂一点。
  2. 在 Flink 中,我不得不同时定义 Consumer 和 Producer,这就增加了额外的代码。
  3. KStream 自动使用记录中存在的时间戳(当它们被插入到 Kafka 中时),而 Flink 需要开发人员提供此信息。我认为未来可以改进 Flink 的 Kafka 连接器,以便开发人员可以编写更少的代码。 
  4. KStream 比 Flink 更容易处理延迟到达,但请注意,Flink 还提供了延迟到达的侧输出流(Side Output),这是 Kafka 流中没有的。
  5. 最后,在运行两者之后,我观察到 Kafka Stream 需要额外的几秒钟来写入输出主题,而 Flink 在计算时间窗口结果的那一刻将数据发送到输出主题非常快。

结论

  • 如果您的项目在源端和接收端都与 Kafka 紧密耦合,那么 KStream API 是更好的选择。但是,您需要管理和操作 KStream 应用程序的弹性。
  • Flink 是一个完整的流式计算系统,支持 HA、容错、自监控和多种部署模式。
  • 由于内置对多个第三方源的支持,并且 Sink Flink 对此类项目更有用。它可以轻松自定义以支持自定义数据源。
  • 与 Kafka Stream 相比,Flink 拥有更丰富的 API,并支持批处理、复杂事件处理(CEP)、FlinkML 和 Gelly(用于图形处理)。

0 人点赞