Kafka扩展内容

2022-10-25 16:00:28 浏览数 (2)

Kafka扩展

  • 一、重复消费配置
  • 二、拦截器
  • 三、Kafka Streams
  • 四、Kafka与Flume比较

一、重复消费配置

代码语言:javascript复制
# 低级API:
props.put("group.id","01");
# offset自动重置,offset可能因为缓存删除,序号不一定从0开始
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

# 高级API:
consumer.seek(new TopicPartition(topic, partititon, offset));

消费者组id、topic和partition唯一确定一个offset。

可以查看_consumer_offsets这个Topic里的数据。

二、拦截器

Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor。

onSend(ProducerRecord)方法在消息被序列号及计算分区之前调用。 onAcknowledgement(RecordMetadata, Exception)在消息被应答或发送失败时调用。

代码语言:javascript复制
// 配置文件添加拦截器链
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, list);
    
public class Interceptor implements ProducerInterceptor<String, String> {
    private int SUCCESS_CONT = 0;
    private int ERROR_CONT = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord<>(record.topic(), record.key(), "new value");
        // return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            SUCCESS_CONT  ;
        } else {
            ERROR_CONT  ;
        }
    }

    @Override
    public void close() {
        System.out.println("success times:"   SUCCESS_CONT);
        System.out.println("error times:"   ERROR_CONT);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

三、Kafka Streams

轻量级(功能性弱) ,实时性(非微批次处理,窗口允许乱序数据,允许数据迟到),一条条数据处理。

代码语言:javascript复制
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>
代码语言:javascript复制
// Kafka Streams处理案例
public class Streams {
    public static void main(String[] args) {
        // 创建拓扑对象
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        // 构建拓扑结构
        topologyBuilder.addSource("SOURCE", "first")
                .addProcessor("PROCESSOR", () -> new MyProcessor(){}, "SOURCE")
                .addSink("SINK", "second", "PROCESSOR");
                
        // 创建配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop01:9092");
        props.put("application.id", "kafkaStreams");

        // 创建Kafka Streams对象
        KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder, props);

        // 开启流处理
        kafkaStreams.start();
    }
}


// TopologyBuilder.addProcessor中的ProssorSupplier返回的Processor类这里做定制
public class MyProcessor implements Processor<byte[], byte[]> {
    private ProcessorContext context;

    // 初始化,上下文
    @Override
    public void init(ProcessorContext processorContext) {
        context = processorContext;
    }

    // 处理逻辑
    @Override
    public void process(byte[] bytes, byte[] bytes2) {
        // 获取一行数据
        String line = new String(bytes2);

        // 逻辑处理
        line = line.replaceAll(">>>", "");
        bytes2 = line.getBytes();

        // 写出数据
        context.forward(bytes, bytes2);
    }

    @Override
    public void punctuate(long l) {

    }

    @Override
    public void close() {

    }
}

四、Kafka与Flume比较

数据传输层。

Flume:cloudera公司研发,适合多个生产者,适合下游数据消费者不多的情况(费内存),适合数据安全性要求不高的操作,适合与Hadoop生态圈对接的操作。

Kafka:linkedin公司研发,适合数据下游消费众多的情况(缓存数据跟消费者个数无关),适合数据安全性较高的操作(数据在磁盘,备份),支持relication。 多个Agent后台数据,交由一个Agent汇总,对接Kafka,离线/实时两条线消费。

0 人点赞