序
本文主要研究一下rocketmq的ExpressionForRetryMessageFilter
MessageFilter
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
代码语言:javascript复制public interface MessageFilter {
/**
* match by tags code or filter bit map which is calculated when message received
* and stored in consume queue ext.
*
* @param tagsCode tagsCode
* @param cqExtUnit extend unit of consume queue
*/
boolean isMatchedByConsumeQueue(final Long tagsCode,
final ConsumeQueueExt.CqExtUnit cqExtUnit);
/**
* match by message content which are stored in commit log.
* <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store,
* {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null.
*
* @param msgBuffer message buffer in commit log, may be null if not invoked in store.
* @param properties message properties, should decode from buffer if null by yourself.
*/
boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
final Map<String, String> properties);
}
- MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法
ExpressionMessageFilter
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
代码语言:javascript复制public class ExpressionMessageFilter implements MessageFilter {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
protected final SubscriptionData subscriptionData;
protected final ConsumerFilterData consumerFilterData;
protected final ConsumerFilterManager consumerFilterManager;
protected final boolean bloomDataValid;
public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
this.subscriptionData = subscriptionData;
this.consumerFilterData = consumerFilterData;
this.consumerFilterManager = consumerFilterManager;
if (consumerFilterData == null) {
bloomDataValid = false;
return;
}
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
bloomDataValid = true;
} else {
bloomDataValid = false;
}
}
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
// no expression or no bloom
if (consumerFilterData == null || consumerFilterData.getExpression() == null
|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
return true;
}
// message is before consumer
if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
return true;
}
byte[] filterBitMap = cqExtUnit.getFilterBitMap();
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (filterBitMap == null || !this.bloomDataValid
|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
return true;
}
BitsArray bitsArray = null;
try {
bitsArray = BitsArray.create(filterBitMap);
boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
return ret;
} catch (Throwable e) {
log.error("bloom filter error, sub=" subscriptionData
", filter=" consumerFilterData ", bitMap=" bitsArray, e);
}
}
return true;
}
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
if (subscriptionData == null) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}
ConsumerFilterData realFilterData = this.consumerFilterData;
Map<String, String> tempProperties = properties;
// no expression
if (realFilterData == null || realFilterData.getExpression() == null
|| realFilterData.getCompiledExpression() == null) {
return true;
}
if (tempProperties == null && msgBuffer != null) {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " realFilterData ", " tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
}
- ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果
ExpressionForRetryMessageFilter
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
代码语言:javascript复制public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
ConsumerFilterManager consumerFilterManager) {
super(subscriptionData, consumerFilterData, consumerFilterManager);
}
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
if (subscriptionData == null) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}
ConsumerFilterData realFilterData = this.consumerFilterData;
Map<String, String> tempProperties = properties;
boolean decoded = false;
if (isRetryTopic) {
// retry topic, use original filter data.
// poor performance to support retry filter.
if (tempProperties == null && msgBuffer != null) {
decoded = true;
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
realFilterData = this.consumerFilterManager.get(realTopic, group);
}
// no expression
if (realFilterData == null || realFilterData.getExpression() == null
|| realFilterData.getCompiledExpression() == null) {
return true;
}
if (!decoded && tempProperties == null && msgBuffer != null) {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " realFilterData ", " tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
}
- ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic;对于retryTopic会使用tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC)来获取realTopic,从而根据consumerFilterManager.get(realTopic, group)获取realFilterData;最后通过realFilterData.getCompiledExpression().evaluate(context)来获取结果
小结
MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法;ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果;ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic
doc
- ExpressionForRetryMessageFilter