Kafka扩展
- 一、重复消费配置
- 二、拦截器
- 三、Kafka Streams
- 四、Kafka与Flume比较
一、重复消费配置
代码语言:javascript复制# 低级API:
props.put("group.id","01");
# offset自动重置,offset可能因为缓存删除,序号不一定从0开始
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
# 高级API:
consumer.seek(new TopicPartition(topic, partititon, offset));
消费者组id、topic和partition唯一确定一个offset。
可以查看_consumer_offsets这个Topic里的数据。
二、拦截器
Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。
onSend(ProducerRecord)方法在消息被序列号及计算分区之前调用。 onAcknowledgement(RecordMetadata, Exception)在消息被应答或发送失败时调用。
代码语言:javascript复制// 配置文件添加拦截器链
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, list);
public class Interceptor implements ProducerInterceptor<String, String> {
private int SUCCESS_CONT = 0;
private int ERROR_CONT = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(), record.key(), "new value");
// return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
SUCCESS_CONT ;
} else {
ERROR_CONT ;
}
}
@Override
public void close() {
System.out.println("success times:" SUCCESS_CONT);
System.out.println("error times:" ERROR_CONT);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
三、Kafka Streams
轻量级(功能性弱) ,实时性(非微批次处理,窗口允许乱序数据,允许数据迟到),一条条数据处理。
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
代码语言:javascript复制// Kafka Streams处理案例
public class Streams {
public static void main(String[] args) {
// 创建拓扑对象
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 构建拓扑结构
topologyBuilder.addSource("SOURCE", "first")
.addProcessor("PROCESSOR", () -> new MyProcessor(){}, "SOURCE")
.addSink("SINK", "second", "PROCESSOR");
// 创建配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop01:9092");
props.put("application.id", "kafkaStreams");
// 创建Kafka Streams对象
KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder, props);
// 开启流处理
kafkaStreams.start();
}
}
// TopologyBuilder.addProcessor中的ProssorSupplier返回的Processor类这里做定制
public class MyProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
// 初始化,上下文
@Override
public void init(ProcessorContext processorContext) {
context = processorContext;
}
// 处理逻辑
@Override
public void process(byte[] bytes, byte[] bytes2) {
// 获取一行数据
String line = new String(bytes2);
// 逻辑处理
line = line.replaceAll(">>>", "");
bytes2 = line.getBytes();
// 写出数据
context.forward(bytes, bytes2);
}
@Override
public void punctuate(long l) {
}
@Override
public void close() {
}
}
四、Kafka与Flume比较
数据传输层。
Flume:cloudera公司研发,适合多个生产者,适合下游数据消费者不多的情况(费内存),适合数据安全性要求不高的操作,适合与Hadoop生态圈对接的操作。
Kafka:linkedin公司研发,适合数据下游消费众多的情况(缓存数据跟消费者个数无关),适合数据安全性较高的操作(数据在磁盘,备份),支持relication。 多个Agent后台数据,交由一个Agent汇总,对接Kafka,离线/实时两条线消费。