序
本文主要研究一下MqPullService的cancelMessage
MqPullService
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java
代码语言:javascript复制public class MqPullService implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);
private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
private static final Batcher BATCHER = Batcher.getInstance();
private volatile boolean shouldStop = false;
private CountDownLatch cdl;
private final List<Long> succOffsets = new ArrayList<>();
private final List<Long> failOffsets = new ArrayList<>();
private SimpleCarreraConsumer carreraConsumer;
private String mqPullServiceName;
private final int INTERNAL_PAIR_COUNT = 5000;
private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);
//......
private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {
InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();
if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);
BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);
BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
} else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);
BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
} else {
MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);
LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),
internalKey.genUniqDelayMsgId());
}
}
//......
}
- cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)
InternalKey
DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java
代码语言:javascript复制public class InternalKey {
private static final String SEPARATOR = "-";
private static final int LEN_UUID = 36;
private static final long ONE_DAY_SECONDS = 24 * 60 * 60;
private long timestamp;
private int type;
private long expire;
private long times;
private long timed;
private long interval;
private int innerTopicSeq;
private String uuid;
private int segmentNum;
private int segmentIndex;
//......
public InternalKey cloneTombstoneInternalKey() {
InternalKey tombstoneInternalKey = new InternalKey(this);
tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
return tombstoneInternalKey;
}
//......
}
- cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()
CancelWrap
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java
代码语言:javascript复制public class CancelWrap {
private String uniqDelayMsgId;
private String topic;
public CancelWrap() {
}
public CancelWrap(String uniqDelayMsgId, String topic) {
this.uniqDelayMsgId = uniqDelayMsgId;
this.topic = topic;
}
public String getUniqDelayMsgId() {
return uniqDelayMsgId;
}
public void setUniqDelayMsgId(String uniqDelayMsgId) {
this.uniqDelayMsgId = uniqDelayMsgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String toJsonString() {
return JsonUtils.toJsonString(this);
}
@Override
public String toString() {
return "CancelWrap{"
"uniqDelayMsgId='" uniqDelayMsgId '''
", topic='" topic '''
'}';
}
}
- CancelWrap定义了uniqDelayMsgId及topic两个属性
Batcher
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java
代码语言:javascript复制public class Batcher {
private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);
private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();
private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();
private WriteBatch wb = new WriteBatch();
private volatile int itemNum = 0;
private static volatile Batcher instance = null;
public static volatile ReentrantLock lock = new ReentrantLock();
//......
public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {
lock.lock();
try {
// 指数循环
// 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
// 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
// 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
// 普通循环
// 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {
internalKey = internalKey.nextUniqDelayMsgId();
}
tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());
tombstoneInternalKey.setTimes(internalKey.getTimed() 2);
tombstoneInternalKey.setTimed(internalKey.getTimed());
if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {
putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),
new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);
}
} finally {
lock.unlock();
}
}
//......
}
- putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录
小结
cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)
doc
- carrera-chronos