2021-10-08 11:45:31
浏览数 (1)
代码语言:javascript
复制package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 统计消息总数
*/
private final AtomicLong eventCount = new AtomicLong();
/**
* 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息
* @param executorService
*/
private void addProcessor(ExecutorService executorService) {
// 准备一个匿名类,传给disruptor的事件处理类,
// 这样每次处理事件时,都会将已经处理事件的总数打印出来
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
ringBuffer.newBarrier(),
new StringEventHandler(eventCountPrinter));
// 将当前消费者的sequence实例传给ringBuffer
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// 启动独立线程获取和消费事件
executorService.submit(batchEventProcessor);
}
@PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
// 创建多个消费者,并在独立线程中获取和消费事件
for (int i=0;i<CONSUMER_NUM;i ) {
addProcessor(executorService);
}
// 生产者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代码和前面的OneConsumerServiceImpl相比差别不大,主要是创建了多个BatchEventProcessor实例,然后分别在线程池中提交;
- 验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testLowLevelOperateService的第三个参数是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示预期的被消费消息数为300:
代码语言:javascript
复制 @Autowired
@Qualifier("multiConsumer")
LowLevelOperateService multiConsumer;
@Test
public void testMultiConsumer() throws InterruptedException {
log.info("start testMultiConsumer");
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
}