我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。
客户端启动流程:
MqClient启动:
代码语言:javascript复制public class MqClientStartup {
//spring初始化完成 ,重要
public static void springInitComplete() {
MqClient.start();
monitorConfig();
}
//初始化
public static void init(Environment env1) {
if (initFlag.compareAndSet(false, true)) {
env = env1;
initConfig();
}
}
//监控配置
private static void monitorConfig() {
executor.execute(new Runnable() {
@Override
public void run() {
while (isRunning) {
updateConfig();
Util.sleep(2000);
}
}
});
}
//更新配置
protected static void updateConfig() {
if (properties == null) {
properties = MqClient.getContext().getConfig().getProperties();
}
setRbTimes();
setPbTimes();
setAsynCapacity();
setMetaMode();
setPullDeltaTime();
setPublishAsynTimeout();
}
}
而这里可以看到启动MqClient.start():
代码语言:javascript复制public static boolean start() {
//启动时,会注册消费者组
if (startFlag.compareAndSet(false, true)) {
registerConsumerGroup();
}
return false;
}
//可以看到只要启动时才会注册消费者组
public static boolean registerConsumerGroup(Map<String, ConsumerGroupVo> groups) {
if (groups == null || groups.size() == 0) {
return false;
}
//如果已经初始化,此时执行注册消费组
if (hasInit()) {
log.info("已经初始化完成!");
return doRegisterConsumerGroup(groups);
} else {
//否者等待初始化完成,然后执行注册消费组
log.warn("系统为初始化,启动异步注册!");
executor.execute(new Runnable() {
public void run() {
while (!hasInit()) {
Util.sleep(2000);
}
doRegisterConsumerGroup(groups);
});
return true;
}
}
执行注册消费组
代码语言:javascript复制private static boolean doRegisterConsumerGroup(Map<String, ConsumerGroupVo> groups) {
//变量消费者组
for (ConsumerGroupVo consumerGroup : groups.values()) {
//校验消费组
if (!checkVaild(consumerGroup)) {
return false;
}
//如果拿到的消费者组版本包含消费者组拿到的元数据中的名称,则设置为false,同时提示已订阅
if (mqContext.getConsumerGroupVersion().containsKey(consumerGroup.getMeta().getName())) {
log.info("ConsumerGroup:" consumerGroup.getMeta().getName() " has subscribed,已订阅!");
return false;
}
//消费者组通过元数据拿到originName如果为空,则设置originName
if (Util.isEmpty(consumerGroup.getMeta().getOriginName())) {
consumerGroup.getMeta().setOriginName(consumerGroup.getMeta().getName());
}
//消费组拿到topic如果不为空,则消费组名称放入信息
if (consumerGroup.getTopics() != null) {
consumerGroupNames.put(consumerGroup.getMeta().getOriginName(),
new ArrayList<>(consumerGroup.getTopics().keySet()));
} else {
consumerGroupNames.put(consumerGroup.getMeta().getOriginName(), new ArrayList<>());
}
groupNames = consumerGroup.getMeta().getName() ",";
}
//事件触发,将注册设置为true
register();
//消费组注册请求,request设置信息包括:消费者名称、消费者id、客户端id、消费者名称、设置subEnv
ConsumerGroupRegisterRequest request = new ConsumerGroupRegisterRequest();
request.setConsumerGroupNames(consumerGroupNames);
request.setConsumerId(mqContext.getConsumerId());
request.setClientIp(mqContext.getConfig().getIp());
request.setConsumerName(mqContext.getConsumerName());
if (MqClient.getMqEnvironment() != null) {
if (MqEnv.FAT == MqClient.getMqEnvironment().getEnv()) {
request.setSubEnv(MqClient.getMqEnvironment().getSubEnv().toLowerCase());
}
}
try {
//消费者组注册响应,注册消费者,放入请求
ConsumerGroupRegisterResponse consumerGroupRegisterResponse = mqContext.getMqResource()
.registerConsumerGroup(request);
//如果消费者组注册响应为success,则填充信息
if (consumerGroupRegisterResponse.isSuc()) {
Map<String, String> broadcastConsumerGroupNames = consumerGroupRegisterResponse
.getConsumerGroupNameNew();
for (ConsumerGroupVo consumerGroup : groups.values()) {
if (broadcastConsumerGroupNames != null
&& broadcastConsumerGroupNames.containsKey(consumerGroup.getMeta().getOriginName())) {
consumerGroup.getMeta()
.setName(broadcastConsumerGroupNames.get(consumerGroup.getMeta().getOriginName()));
}
mqContext.getConfigConsumerGroup().put(consumerGroup.getMeta().getName(), consumerGroup);
mqContext.getConsumerGroupVersion().put(consumerGroup.getMeta().getName(), 0L);
//fire消费者组注册事件为true getRegisterConsumerGroupListeners()线程启动
fireConsumerGroupRegisterEvent(consumerGroup);
}
//创建消费者polling服务,并启动consumerPollingService服务,mqCheckService创建,同时启动
consumerPollingService = mqFactory.createConsumerPollingService();
consumerPollingService.start();
mqCheckService = mqFactory.createMqCheckService();
mqCheckService.start();
// MqCheckService.getInstance().start(mqContext);
log.info(groupNames " subscribe_suc,订阅成功!and json is " JsonUtil.toJson(request));
} else {
throw new RuntimeException("registerConsumerGroup_error, the req is" JsonUtil.toJsonNull(request)
",and resp is " JsonUtil.toJson(consumerGroupRegisterResponse));
}
} catch (Exception e) {
log.error("consumer_group_register_error", e);
throw new RuntimeException(e);
}
return true;
}
执行consumerPollingService、mqCheckService启动
代码语言:javascript复制@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
isStop = false;
runStatus = false;
executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100),
SoaThreadFactory.create("ConsumerPollingService", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(new Runnable() {
@Override
public void run() {
while (!isStop) {
//将允许状态设置为true,同时将traceMessaageItem设置为true,执行longPolling操作
TraceMessageItem traceMessageItem = new TraceMessageItem();
runStatus = true;
try {
traceMessageItem.status = "suc";
longPolling();
} catch (Exception e) {
// e.printStackTrace();
traceMessageItem.status = "fail";
Util.sleep(1000);
}
//链路追踪添加链路信息,同时设置为false
traceMsg.add(traceMessageItem);
runStatus = false;
}
}
});
}
}
设置consumerPollingService状态为true,同时启动longPolling方法,也即此时会启动链路追踪
代码语言:javascript复制protected void longPolling() {
if (mqContext.getConsumerId() > 0 && mqContext.getConsumerGroupVersion() != null
&& mqContext.getConsumerGroupVersion().size() > 0) {
//设置事务
Transaction transaction = Tracer.newTransaction("mq-group", "longPolling");
try {
//创建获取消费组请求对象,放入消费者id、消费者组版本
GetConsumerGroupRequest request = new GetConsumerGroupRequest();
request.setConsumerId(mqContext.getConsumerId());
request.setConsumerGroupVersion(mqContext.getConsumerGroupVersion());
//获取消费者组响应
GetConsumerGroupResponse response = mqResource.getConsumerGroup(request);
if (response != null && response.getConsumerDeleted() == 1) {
log.info("consumerid为" request.getConsumerId());
}
//执行处理group
handleGroup(response);
transaction.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
transaction.setStatus(e);
} finally {
transaction.complete();
}
} else {
Util.sleep(1000);
}
}
执行handlerGroup:
代码语言:javascript复制protected void handleGroup(GetConsumerGroupResponse response) {
if (isStop) {
return;
}
//响应如果不为空,则设置broker元数据模式
if (response != null) {
mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
if (MqClient.getMqEnvironment() != null && MqClient.getMqEnvironment().getEnv() == MqEnv.FAT) {
MqClient.getContext().setAppSubEnvMap(response.getConsumerGroupSubEnvMap());
}
}
//mq客户端重新启动
if (response != null && response.getConsumerDeleted() == 1) {
MqClient.reStart();
Util.sleep(5000);
return;
} else if (response != null && response.getConsumerGroups() != null
&& response.getConsumerGroups().size() > 0) {
log.info("get_consumer_group_data,获取到的最新消费者组数据为:" JsonUtil.toJson(response));
TraceMessageItem item = new TraceMessageItem();
item.status = "changed";
item.msg = JsonUtil.toJson(response);
//响应拿到消费者组,进行遍历,如果没有stop,同时请求为新请求,请求放入,同时创建mq组线程服务
response.getConsumerGroups().entrySet().forEach(t1 -> {
if (!isStop) {
if (!mqExcutors.containsKey(t1.getKey())) {
mqExcutors.put(t1.getKey(), mqFactory.createMqGroupExcutorService());
}
log.info("consumer_group_data_change,消费者组" t1.getKey() "发生重平衡或者meta更新");
// 进行重平衡操作或者更新元数据信息
mqExcutors.get(t1.getKey()).rbOrUpdate(t1.getValue(), response.getServerIp());
mqContext.getConsumerGroupVersion().put(t1.getKey(), t1.getValue().getMeta().getVersion());
}
});
traceMsg.add(item);
}
// 然后启动
mqExcutors.values().forEach(t1 -> {
t1.start();
});
}
可以看到MqGroupExcutorService信息:
代码语言:javascript复制public MqGroupExcutorService(IMqResource mqResource) {
//mq上下文、mqResource、mq工厂
this.mqContext = MqClient.getContext();
this.mqResource = mqResource;
this.mqFactory = MqClient.getMqFactory();
}
同时可以看到MqCheckService信息:
代码语言:javascript复制public MqCheckService(IMqResource mqResource) {
this.mqContext = MqClient.getContext();
this.mqResource = mqResource;
}
Mq启动后置处理器:这里是和Spring集成的方式:
代码语言:javascript复制@Component
public class MqStartProcessor implements BeanFactoryPostProcessor, PriorityOrdered, EnvironmentAware {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
if (environment != null) {
if (initFlag.compareAndSet(false, true)) {
logger.info("消息客户端开始初始化!");
MqClient.setSubscriberResolver(new SubscriberResolver());
//初始化
MqClientStartup.init(environment);
//MqClientStartup.start();
// statService.start();
logger.info("消息客户端初始化完成!");
}
}
}
}
mq客户端启动初始化:
代码语言:javascript复制public static void init(Environment env1) {
if (initFlag.compareAndSet(false, true)) {
env = env1;
initConfig();
}
}
//初始化配置
private static void initConfig() {
MqConfig config = new MqConfig();
String netCard = System.getProperty("mq.network.netCard", env.getProperty("mq.network.netCard", ""));
String url =System.getProperty("mq.broker.url", env.getProperty("mq.broker.url", ""));
String host = System.getProperty("mq.client.host", env.getProperty("mq.client.host", ""));
String serverPort = System.getProperty("server.port", env.getProperty("server.port", "8080"));
String asynCapacity = System.getProperty("mq.asyn.capacity", env.getProperty("mq.asyn.capacity", "2000"));
String rbTimes = System.getProperty("mq.rb.times", env.getProperty("mq.rb.times", "4"));
String pbRetryTimes = System.getProperty("mq.pb.retry.times", env.getProperty("mq.pb.retry.times", "10"));
String readTimeOut = System.getProperty("mq.http.timeout", env.getProperty("mq.http.timeout", "10000"));
String pullDeltaTime = System.getProperty("mq.pull.time.delta", env.getProperty("mq.pull.time.delta", "150"));
boolean metaMode = "true"
.equals(System.getProperty("mq.broker.metaMode", env.getProperty("mq.broker.metaMode", "true")));
//mq客户端初始化,然后更新配置
MqClient.init(config);
updateConfig();
}
进行初始化:
代码语言:javascript复制public static void init(MqConfig config) {
if (initFlag.compareAndSet(false, true)) {
//执行初始化
doInit(config);
//激活初始化事件 getInitCompleted 线程启动
fireInitEvent();
log.info("mq_client has inited,初始化完成");
}
}
//执行初始化
private static void doInit(MqConfig config) {
//设置消费者名称
mqContext.setConsumerName(
ConsumerUtil.getConsumerId(config.getIp(), PropUtil.getProcessId() "", config.getServerPort()));
//如果mq上下文里面的mq信息为null,mqContext填充配置信息,同时异步消息为空,则创建msgAsyn队列。
if (mqContext.getMqResource() == null) {
mqContext.setMqResource(
getMqFactory().createMqResource(config.getUrl(), config.getReadTimeOut(), config.getReadTimeOut()));
}
mqContext.setConfig(config);
if (msgsAsyn == null) {
msgsAsyn = new ArrayBlockingQueue<>(config.getAsynCapacity());
}
//创建mqBrokerUrlRefresh服务,同时启动mqBrokerUrl刷新服务
mqBrokerUrlRefreshService = mqFactory.createMqBrokerUrlRefreshService();
mqBrokerUrlRefreshService.start();
}
启动:
代码语言:javascript复制@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
isStop = false;
runStatus = false;
//执行更新brokerUrls操作,使用单例线程池
doUpdateBrokerUrls();
executor = Executors.newScheduledThreadPool(1,
SoaThreadFactory.create("mq-brokerFreshService-pool-%d", Thread.MAX_PRIORITY - 1, true));
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (!isStop) {
runStatus = true;
doUpdateBrokerUrls();
runStatus = false;
}
}
}, 1, 20, TimeUnit.SECONDS);
}
}
进行启动:
代码语言:javascript复制protected void doUpdateBrokerUrls() {
try {
GetMetaGroupResponse response = this.mqResource.getMetaGroup(request);
if(response==null){
return;
}
if (response != null && response.isSuc()) {
mqContext.setBrokerMetaMode(response.getBrokerMetaMode());
mqContext.setMetricUrl(response.getMetricUrl());
if(Util.isEmpty(mqContext.getMetricUrl())){
//MqMeticReporterService.getInstance(mqClientBase).close();
MqClient.getMqFactory().createMqMeticReporterService().close();
}else{
//创建mqMeticReportService,并启动
MqClient.getMqFactory().createMqMeticReporterService().start();
}
}
if (mqContext.getBrokerMetaMode() == 1 || (mqContext.getBrokerMetaMode() == 0 && mqContext.getConfig().isMetaMode())) {
//List<String> brokerUrls = response.getBrokerIp();
if (response.getBrokerIpG1()!=null) {
mqContext.setBrokerUrls(response.getBrokerIpG1(),response.getBrokerIpG2());
}
} else if (mqContext.getBrokerMetaMode() == -1 || !mqContext.getConfig().isMetaMode()) {
mqContext.setBrokerUrls(new ArrayList<>(),new ArrayList<>());
}
} catch (Exception e) {
log.error("updateBrokerError", e);
}
}
执行数据上报服务:
代码语言:javascript复制@Override
public void start() {
if (startFlag.compareAndSet(false, true)) {
// 每30s上报数据
reporter.start(30, TimeUnit.SECONDS);
}
}