当使用kakfa作为sparkStreaming 的数据源时有两种对接方式: reciver 与 direct
1. reciver
reciver 方式是sparkStreaming数据源的标准使用方式,会单独开启reciver进程从数据源中获取数据,kafka reciver使用high level api从kafka 中拉取数据,并且每个批次生成batchInterval / spark.streaming.blockInterval 个分区的RDD(batchInterval 表示批次时间,spark.streaming.blockInterval 表示生成一个RDD分区的时间), 因此kafka topic partition的个数与RDD partition个数没有任何关系。数据源端的可靠性保证可通过两种方式实现:内存副本与WAL, 从kafka拉取的数据会默认序列化的方式存储在内存中与磁盘,为了防止reciver所在executor挂掉,提高其可靠性可使用双副本方式,分别储存在两个不同的executor中,再者两个存储数据的executor都挂掉,可开启WAL即预写日志机制,将批次的数据存储在hdfs上,通过hdfs的容错性保证数据源的容错性。
2. direct
direct 方式使用simple level api的方式从kafka 拉取数据,kafka simple api 不同于high api需要自动维护offset决定从kakfa拉取的数据量,动态感知kafka topic partition增加,并且kafka topic 的partition个数与RDD partition个数相同,sparkStreaming 会将每个批次的offset 范围保存在元数据中,配合使用checkpoint机制将元数据保存在hdfs上保证数据源的可靠性,与reciver方式相比较代价更低。
反压机制比对:反压机制是指下游数据处理过慢或者过快如何调整上游数据源的生产速率
reciver 方式按照一定的数据大小从kafka中拉取数据,若该批次处理时间大于设置的batchInterval 那么就会导致任务堆积,若该批次处理时间小于batchInterval那么就会导致资源空闲浪费无法得到充分利用;direct 方式由于自身维护offset从kafka 中获取数据,可根据过往批次的数据处理时间合理调整从kafka拉取数据的offset范围,若批次时间过长减少拉取的数据量,若批次时间过短增加拉取的数据量,既保证不会有任务堆积又保证资源的充分利用
offset管理比对:
reciver 方式不需要手动管理offset, 在kafka的配置参数可使用enable.auto.commit 开启offset的提交方式,若为ture,将按照一定的时间间隔提交offset到kafka中, 若为false需要手动提交offset, 否则将在kafka管理平台无法监控topic的消费速率,起始消费位置可通过auto.offset.reset,若为smallest 则按照topic的最开始位置或者最高group id提交的offset位置消费,若为 largest则按照最终位置或者group id最近提交的offset位置消费。
direct 方式自动管理offset 那么是不会将offset提交到kafka中去的,需要手动提交:
起始消费位置除了可通过auto.offset.reset指定,也可手动指定offset从kafka中拉取数据:
综合以上比对,实际中使用direct方式对接kafka。
端到端的exectly once语义实现
流式处理系统中很重的一个指标就是消费语义实现,从数据源到数据处理过程再到处理结果的数据如何保证每条数据恰好精确被
处理一次对于实时计费、实时指标统计是一个很重要的标准。 总所周知的消费语义有三种:
1. 至多一次,数据处理失败或者输出结果失败数据源不会重新回放该数据源
2. 至少一次,对于维度叠加统计数据,如果在数据输出一半过程中失败,那么任务重新执行会导致输出的数据多次叠加统计
3. 精确一次,无论任务任务失败或者数据重新输出,对结果影响效果不变
由于sparkStreaming数据源、任务处理都是支持失败重试机制因此保证了至少一次的消费语义,那么如何保证输出端的 精确一次处理,提供以下两种方式:
a. 使用支持事务的数据库作为输出端的接收源(mysql),将需要输出的数据拉取到dirver端,开始事务方式,将结果推送到mysql中,提交中途出现失败,事务回滚取消数据提交,任务重新执行将不会对数据库产生影响。
如果在executor端执行数据提交,每个executor只是提交部分数据,可能存在部分成功,部分失败导致任务重跑,那么还将是至少一次的语义;
b. 对于不支持事务的接收源(reids/hbase),可借助offset实现精确一次消费语义,由于批次之间按照严格的顺序执行并且批次任务失败重新执行的offset不变,那么可在在每个批次数据保存的同时将该批次的 offset保存起来,下批次更新数据首先判断当前保存的offset 与当前批次的offset 是否一致,若一致则不更新数据,若不一致则更新数据,以保证精确一次的消费语义。此方式即可在driver端执行也可在executor段执行,需要注意的是需要在批次任务中将维度数据进行叠加输出,而不是在批次中维度部分叠加然后更新,这种方式同样会导致至少一次的消费语义。
c. 另外一种实现exectly once 通过业务上实现,即输出数据中存在唯一字段或者联合唯一字段,与数据库中现有的值进行比较,存在则插入否则不执行。