聊聊chronos的cancelMessage

2020-01-07 09:53:21 浏览数 (1)

本文主要研究一下chronos的cancelMessage

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

代码语言:javascript复制
public class MqPullService implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);
​
    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
    private static final Batcher BATCHER = Batcher.getInstance();
    private volatile boolean shouldStop = false;
    private CountDownLatch cdl;
​
    private final List<Long> succOffsets = new ArrayList<>();
    private final List<Long> failOffsets = new ArrayList<>();
    private SimpleCarreraConsumer carreraConsumer;
    private String mqPullServiceName;
​
    private final int INTERNAL_PAIR_COUNT = 5000;
    private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);
​
    //......
​
    private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {
        InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();
​
        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);
​
            BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
                    new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);
​
            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);
​
            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);
​
            LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),
                    internalKey.genUniqDelayMsgId());
        }
    }
​
    //......
}
  • cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

代码语言:javascript复制
public class InternalKey {
    private static final String SEPARATOR = "-";
    private static final int LEN_UUID = 36;
    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;
​
    private long timestamp;
    private int type;
    private long expire;
    private long times;
    private long timed;
    private long interval;
    private int innerTopicSeq;
    private String uuid;
    private int segmentNum;
    private int segmentIndex;
​
    //......
​
    public InternalKey cloneTombstoneInternalKey() {
        InternalKey tombstoneInternalKey = new InternalKey(this);
        tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
        return tombstoneInternalKey;
    }
​
    //......
}
  • cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

代码语言:javascript复制
public class CancelWrap {
    private String uniqDelayMsgId;
    private String topic;
​
    public CancelWrap() {
    }
​
    public CancelWrap(String uniqDelayMsgId, String topic) {
        this.uniqDelayMsgId = uniqDelayMsgId;
        this.topic = topic;
    }
​
    public String getUniqDelayMsgId() {
        return uniqDelayMsgId;
    }
​
    public void setUniqDelayMsgId(String uniqDelayMsgId) {
        this.uniqDelayMsgId = uniqDelayMsgId;
    }
​
    public String getTopic() {
        return topic;
    }
​
    public void setTopic(String topic) {
        this.topic = topic;
    }
​
    public String toJsonString() {
        return JsonUtils.toJsonString(this);
    }
​
    @Override
    public String toString() {
        return "CancelWrap{"  
                "uniqDelayMsgId='"   uniqDelayMsgId   '''  
                ", topic='"   topic   '''  
                '}';
    }
}
  • CancelWrap定义了uniqDelayMsgId及topic两个属性

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

代码语言:javascript复制
public class Batcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);
​
    private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();
    private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();
​
    private WriteBatch wb = new WriteBatch();
    private volatile int itemNum = 0;
    private static volatile Batcher instance = null;
​
    public static volatile ReentrantLock lock = new ReentrantLock();
​
    //......
​
    public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {
        lock.lock();
        try {
            // 指数循环
            // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
​
            // 普通循环
            // 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {
                internalKey = internalKey.nextUniqDelayMsgId();
            }
​
            tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());
            tombstoneInternalKey.setTimes(internalKey.getTimed()   2);
            tombstoneInternalKey.setTimed(internalKey.getTimed());
​
            if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {
                putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),
                        new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);
            }
        } finally {
            lock.unlock();
        }
    }
​
    //......
}
  • putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录

小结

cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

doc

  • carrera-chronos

0 人点赞