事件处理类,收到事件后具体的业务处理逻辑

2021-09-26 17:40:38 浏览数 (1)

  • 事件生产类,定义如何将业务逻辑的事件转为disruptor事件发布到环形队列,用于消费:
代码语言:javascript复制
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {

    // 存储数据的环形队列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {

        // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件添加业务信息
            stringEvent.setValue(content);
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}
  • 事件处理类,收到事件后具体的业务处理逻辑:
代码语言:javascript复制
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}

0 人点赞