创建了多个实例,然后分别在线程池中提交

2021-10-08 11:45:31 浏览数 (1)

  • 代码如下,有几处要注意的地方稍后提到:
代码语言:javascript复制
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    /**
     * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息
     * @param executorService
     */
    private void addProcessor(ExecutorService executorService) {
        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                ringBuffer.newBarrier(),
                new StringEventHandler(eventCountPrinter));

        // 将当前消费者的sequence实例传给ringBuffer
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 启动独立线程获取和消费事件
        executorService.submit(batchEventProcessor);
    }

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        // 创建多个消费者,并在独立线程中获取和消费事件
        for (int i=0;i<CONSUMER_NUM;i  ) {
            addProcessor(executorService);
        }

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代码和前面的OneConsumerServiceImpl相比差别不大,主要是创建了多个BatchEventProcessor实例,然后分别在线程池中提交;
  • 验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testLowLevelOperateService的第三个参数是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示预期的被消费消息数为300
代码语言:javascript复制
 	@Autowired
    @Qualifier("multiConsumer")
    LowLevelOperateService multiConsumer;

    @Test
    public void testMultiConsumer() throws InterruptedException {
        log.info("start testMultiConsumer");
        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
    }

0 人点赞