Kafka与Spark Streaming整合

2021-01-13 00:29:41 浏览数 (1)

Kafka与Spark Streaming整合

概述

Spark Streaming是一个可扩展,高吞吐,容错能力强的实时流式处理处理系统。一般的系统架构图是,数据从一个源点,经过Sparing Streaming处理,最后汇聚到一个系统。Spark Streaming的数据来源可以非常丰富,比如Kafka, Flume, Twitter, ZeroMQ, Kinesis 或者是任何的TCP sockets程序。对于数据的处理,Spark Streaming提供了非常丰富的高级api,例如map,redue,joini和窗口函数等等。数据处理完成后,可以存储到其他地方,比如文件系统,对象存储,数据库。典型的数据处理流程图:

概念简介

RDD:Resilient Distributed Datasets,弹性分部署数据集,支持两种操作:转换(transformation)从现有的数据集创建一个新的数据集;动作(actions)在数据集上运行计算后,返回一个值给驱动程序。在这里简单理解为某个时间片的数据集合即可。

DStream:和RDD概念有点类似,是RDD的集合,代表着整个数据流。简单来说Spark Streaming中的数据量就是DStream,然后每个时间片的数据就是RDD。

Kafka与Spark Streaming整合

整合方式

Kafka与Spark Streaming整合,首先需要从Kafka读取数据过来,读取数据有两种方式

  • 方法一:Receiver-based 这种方式使用一个Receiver接收Kafka的消息,如果使用默认的配置,存在丢数据的风险,因为这种方式会把从kafka接收到的消息存放到Spark的exectors,然后再启动streaming作业区处理,如果exectors挂了,那么消息可能就丢失了。可以通过开启Write Ahead Logs来保证数据的可靠性(Spark 1.2后开始支持),这种方式和大多数存储系统的Write Ahead Logs类似,Spark会把接收到的消息及kafka消息偏移存放到分布式文件系统中(例如HDFS),如果运行期间出现了故障,那么这些信息会被用于故障恢复。
  • 方法二:Direc 这种方式是Spark 1.3引入的,Spark会创建和Kafka partition一一对应的的RDD分区,然后周期性的去轮询获取分区信息,这种方式和Receier-based不一样的是,它不需要Write Ahead Logs,而是通过check point的机制记录kafka的offset,通过check point机制,保证Kafka中的消息不会被遗漏。这种方式相对于第一种方式有多种优点,一是天然支持并发,建了了和Kafka的partition分区对应的RDD分区,第二点是更高效,不需要write ahead logs,减少了写磁盘次数,第三种优点是可以支持exactly-once语义,通过checkpoint机制记录了kafka的offset,而不是通过zk或者kafka来记录offset能避免分布式系统中数据不一致的问题,从而能支持exactly-once语义,当然这里不是说这样就完全是exactly-once了,还需要消费端配合做消息幂等或事物处理。这种模式是较新的模式,推荐使用该模式,第一种方式已经逐步被淘汰。

整合示例

下面使用一个示例,展示如何整合Kafka和Spark Streaming,这个例子中,使用一个生产者不断往Kafka随机发送数字,然后通过Spark Streaming统计时间片段内数字之和。

1.往kafka随机发送数字代码:

代码语言:txt复制
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "spark-producer-demo-client");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String,String> producer = new KafkaProducer<>(properties);

        Random random = new Random();
        while (true) {
            int value = random.nextInt(10);
            ProducerRecord<String, String> message =
                    new ProducerRecord<>(topic, value "");
            producer.send(message, (recordMetadata, e) -> {
                if (recordMetadata != null) {
                    System.out.println(recordMetadata.topic()   "-"   recordMetadata.partition()   ":"  
                            recordMetadata.offset());
                }
            });
            TimeUnit.SECONDS.sleep(1);

2.提交到spark集群代码,用于统计2秒时间间隔的数字之和,这里我们使用的是Direct模式:

代码语言:txt复制
 def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]")
      .setAppName("StreamingWithKafka")
    val ssc = new StreamingContext(sparkConf, Seconds(2)) // 1
    ssc.checkpoint(checkpointDir)

    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false:java.lang.Boolean)
    )

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, // 2
      Subscribe[String, String](List(topic), kafkaParams))
    val value = stream.map(record => {
      val intVal = Integer.valueOf(record.value())
      println(intVal)
      intVal
    }).reduce(_ _)

    value.print()

    ssc.start
    ssc.awaitTermination
  }

上面代码中1处的代码表示聚合处理的时间分片为2秒,计算两秒内的随机数之和。2处的代码用于指定spark执行器上面的kafka consumer分区分配策略,一共有三种类型,PreferConsistent是最常用的,表示订阅主题的分区均匀分配到执行器上面,然后还有PreferBrokers,这种机制是优先分配到和broker相同机器的执行器上,还有一种是PreferFixed,这种是手动配置,用的比较少。上面程序每次计算2秒时间间隔内的数字之和,输入会类似如下:

代码语言:txt复制
3
4
...

0 人点赞