样例
代码语言:javascript复制DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("organization");
consumer.setNamesrvAddr("172.22.0.64:9876"); // NAME_SERVER地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从哪个位置开始消费消息
consumer.subscribe("my-topic", "*"); // 订阅主题
consumer.registerMessageListener(new MessageListenerOrderly() { // 注册消息监听(顺序方式)
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
主要参数
- ConsumeFromWhere 控制新的消费者组从哪个位置开始消费
枚举值 | 效果 |
---|---|
CONSUME_FROM_LAST_OFFSET | 从最新的消息开始消费 |
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST | 废弃,效果同上 |
CONSUME_FROM_MIN_OFFSET | 废弃,效果同上 |
CONSUME_FROM_MAX_OFFSET | 废弃 ,效果同上 |
CONSUME_FROM_FIRST_OFFSET | 从最早的消息开始消费 |
CONSUME_FROM_TIMESTAMP | 从指定时间开始消费 |
注:如果使用CONSUME_FROM_TIMESTAMP ,需设置参数 DefaultMQPushConsumer.setConsumeTimestamp(“20131223171201”) 时间戳字符串格式为yyyyMMddHHmmss
- DefaultMQPushConsumer.subscribe(String topic, String subExpression) subExpression参数为tag选择表达式 语法:
- 不过滤tag:"*" 或者null
- 根据多个tag过滤:“tag1 || tag2 || tag3”
- DefaultMQPushConsumer.registerMessageListener注册消费监听器
- 顺序消费监听器MessageListenerOrderly 返回ConsumeOrderlyStatus
枚举值 | 效果 |
---|---|
SUCCESS | 成功 |
ROLLBACK | 废弃 |
COMMIT | 废弃 |
SUSPEND_CURRENT_QUEUE_A_MOMENT | 暂停当前队列一会 |
- 并行消费监听器MessageListenerConcurrently 返回ConsumeConcurrentlyStatus
枚举值 | 效果 |
---|---|
CONSUME_SUCCESS | 成功 |
RECONSUME_LATER | 失败,稍后重试 |
源码
- org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}