序
本文主要研究一下RocketMQCanalConnector的getFlatList
getFlatList
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
代码语言:javascript复制public class RocketMQCanalConnector implements CanalMQConnector {
private static final Logger logger = LoggerFactory.getLogger(RocketMQCanalConnector.class);
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
private String nameServer;
private String topic;
private String groupName;
private volatile boolean connected = false;
private DefaultMQPushConsumer rocketMQConsumer;
private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
private int batchSize = -1;
private long batchProcessTimeout = 60 * 1000;
private boolean flatMessage;
private volatile ConsumerBatchMessage lastGetBatchMessage = null;
private String accessKey;
private String secretKey;
private String customizedTraceTopic;
private boolean enableMessageTrace = false;
private String accessChannel;
private String namespace;
//......
public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
if (messages != null && !messages.isEmpty()) {
ack();
}
return messages;
}
public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
throw new CanalClientException("mq get/ack not support concurrent & async ack");
}
ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
if (batchMessage != null) {
this.lastGetBatchMessage = batchMessage;
return batchMessage.getData();
}
} catch (InterruptedException ex) {
logger.warn("Get message timeout", ex);
throw new CanalClientException("Failed to fetch the data after: " timeout);
}
return Lists.newArrayList();
}
public void ack() throws CanalClientException {
try {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.ack();
}
} catch (Throwable e) {
if (this.lastGetBatchMessage != null) {
this.lastGetBatchMessage.fail();
}
} finally {
this.lastGetBatchMessage = null;
}
}
//......
}
- RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()
subscribe
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
代码语言:javascript复制public class RocketMQCanalConnector implements CanalMQConnector {
//......
public synchronized void subscribe(String filter) throws CanalClientException {
if (connected) {
return;
}
try {
if (rocketMQConsumer == null) {
this.connect();
}
rocketMQConsumer.subscribe(this.topic, "*");
rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
boolean isSuccess = process(messageExts);
if (isSuccess) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
rocketMQConsumer.start();
} catch (MQClientException ex) {
connected = false;
logger.error("Start RocketMQ consumer error", ex);
}
connected = true;
}
//......
}
- subscribe方法给rocketMQConsumer注册了MessageListenerOrderly,其consumeMessage方法执行process方法
process
canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java
代码语言:javascript复制public class RocketMQCanalConnector implements CanalMQConnector {
//......
private boolean process(List<MessageExt> messageExts) {
if (logger.isDebugEnabled()) {
logger.debug("Get Message: {}", messageExts);
}
List messageList = Lists.newArrayList();
for (MessageExt messageExt : messageExts) {
byte[] data = messageExt.getBody();
if (data != null) {
try {
if (!flatMessage) {
Message message = CanalMessageDeserializer.deserializer(data);
messageList.add(message);
} else {
FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
messageList.add(flatMessage);
}
} catch (Exception ex) {
logger.error("Add message error", ex);
throw new CanalClientException(ex);
}
} else {
logger.warn("Received message data is null");
}
}
ConsumerBatchMessage batchMessage;
if (!flatMessage) {
batchMessage = new ConsumerBatchMessage<Message>(messageList);
} else {
batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
}
try {
messageBlockingQueue.put(batchMessage);
} catch (InterruptedException e) {
logger.error("Put message to queue error", e);
throw new RuntimeException(e);
}
boolean isCompleted;
try {
isCompleted = batchMessage.waitFinish(batchProcessTimeout);
} catch (InterruptedException e) {
logger.error("Interrupted when waiting messages to be finished.", e);
throw new RuntimeException(e);
}
boolean isSuccess = batchMessage.isSuccess();
return isCompleted && isSuccess;
}
//......
}
- process方法会将MessageExt转换为Message或者FlatMessage,然后组装成ConsumerBatchMessage放到messageBlockingQueue中
小结
RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()
doc
- RocketMQCanalConnector