spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上 A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量 B、对于不同的group和topic可以使用多个receivers创建不同的DStream C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER) 2.KafkaUtils.createDirectStream 区别Receiver接收数据,这种方式定期地从kafka的topic partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api 优点: A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。 B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中
C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具
总结:
如果消费的消息精度不高,可以直接用createDstream
示例:
创建存储偏移量的表
CREATE TABLE `kafka_task` (
`id` bigint(11) NOT NULL AUTO_INCREMENT, `topic` varchar(100) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '主题', `group_id` varchar(50) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '消费组', `part` int(4) DEFAULT NULL COMMENT '分区', `offset` mediumtext CHARACTER SET utf8mb4 COMMENT '偏移量', `ctime` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`), UNIQUE KEY `topic_group_part` (`topic`,`group_id`,`part`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8 COMMENT='存偏移量'
配置资源文件connection.properties
jdbc.driver = com.mysql.jdbc.Driver jdbc.url = jdbc:mysql://xxxx:xxxx/xxx?useUnicode=true&characterEncoding=utf8 jdbc.user = xxx jdbc.password = xxx kafka.topics = test kafka.brokers = xxx:9093,xxx:9094 kafka.group = test
kafka.sec=10
import java.io.{BufferedInputStream, FileInputStream}
import java.text.SimpleDateFormat import java.util.{Date, Properties} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scalikejdbc._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * 手动控制sparkstream kafka偏移量 *偏移量存储到mysql上 * * **/ object DirectKafkaDemo { def getNowDate():String={ val now:Date=new Date() val dateFormat:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") dateFormat.format(now) } def main(args: Array[String]): Unit = {
//根据资源文件获取kafka配置
val prop=PropertiesLoad.apply()
val topic=prop.getProperty("kafka.topics") val broker=prop.getProperty("kafka.brokers") val group=prop.getProperty("kafka.group") //kafka资源配置 val topics=Set(topic) val kafkaParams=Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->broker, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG-> group, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG->"1000", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) )
//spark配置,
val conf=new SparkConf().setAppName("DirectKafkaDemo").setMaster("local[3]") .set("spark.worker.timeout", "500") .set("spark.cores.max", "6") .set("spark.streaming.kafka.maxRatePerPartition", "500") .set("spark.rpc.askTimeout", "600s") .set("spark.network.timeout", "600s") .set("spark.streaming.backpressure.enabled", "true") .set("spark.task.maxFailures", "1") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //这个必须加上,不然通信报错 .set("spark.streaming.kafka.consumer.poll.ms", "4096")//这个值是你的数据量而定 val sc=new SparkContext(conf) val ssc=new StreamingContext(sc,Seconds(1))//时间间隔 //连接mysql ConnectionJdbc.apply() val fromOffsets=DB.readOnly(implicit session=> sql"""select topic, part, offset from kafka_task where group_id={group}, {osr.partition}, {osr.untilOffset},{ctime})""".update.apply() if(osr.partition == 0){ println(osr.partition, osr.untilOffset) } } } } } /** * 开始 */ ssc.start() ssc.awaitTermination() } } /** * 连接mysql */ object ConnectionJdbc { def apply(): Unit = { val prop=PropertiesLoad.apply() val driver=prop.getProperty("jdbc.driver") val host=prop.getProperty("jdbc.url") val user=prop.getProperty("jdbc.user") val password=prop.getProperty("jdbc.password") Class.forName(driver) ConnectionPool.singleton(host, user, password) } } /** * 连接资源文件 */ object PropertiesLoad{ def apply():Properties={ val in = getClass.getResourceAsStream("/connection.properties") val prop = new Properties() prop.load(new BufferedInputStream(in)) prop } }
参考:https://www.cnblogs.com/hd-zg/p/6188287.html