pmq学习七-重平衡

2021-02-03 10:09:22 浏览数 (1)

可以看到在pmq-ui中启动时会执行启动ComsumerGroupRb服务,同时还会启动MessageLagNotify服务以及Queue队列服务。消费组重平衡服务、db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等

为了解决consumer和topic数量动态变化造成的问题,引入了重平衡(即consumer和queue的动态分配。

PMQ有一个重平衡器,它用来监控consumer的加入和退出、topic的扩容和缩容。

当某一个consumerGroup下的consumer数量发生变化,或者该consumerGroup订阅的topic的queue数量 发生了变化,就会触发重平衡器对该consumerGroup进行重平衡操作。

重平衡器对需要重平衡的consumerGroup,进行consumer和queue的重新分配。

代码语言:javascript复制
//重平衡分配器 初始化
@PostConstruct
private void init() {
  //初始化
  super.init(Constants.RB, soaConfig.getRbCheckInterval(), soaConfig);

  soaConfig.registerChanged(new Runnable() {
     //通过soa配置获取重平衡检查interval
     private volatile int interval = soaConfig.getRbCheckInterval(); @Override
 public void run() {
    if (soaConfig.getRbCheckInterval() != interval) {
       interval = soaConfig.getRbCheckInterval();
       updateInterval(interval);
    }
 }

  });
}

进行初始化:

代码语言:javascript复制
//注意interval为master判断间隔时间,当强行删除mqlock数据中的某条记录时,如果应用都启动了,必须等待一个interval周期才会开始新的master选举过程,如果有新的应用产生则进行新的选举,选择采取先到先得原则
public void init(String key, int interval, SoaConfig soaConfig) {
   this.key = key;
   mqLockService = new MqLockServiceImpl(key);
   executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),
         SoaThreadFactory.create(key, true), new ThreadPoolExecutor.DiscardOldestPolicy());
   this.interval = interval;
   this.soaConfig = soaConfig;
   this.traceMessage = TraceFactory.getInstance(key);
}

执行mq锁定master操作:

代码语言:javascript复制
//mq锁定服务实现
public MqLockServiceImpl(String key) {
    this.key = key;
    this.emailUtil = SpringUtil.getBean(EmailUtil.class);
    this.traceMessage = TraceFactory.getInstance("lock-"   key);
    this.heartbeatProperty = new HeartbeatProperty() {
        @Override
        public int getValue() {
            //获取mq锁定心跳间隔
            return soaConfig.getMqLockHeartBeatTime();
        }
    };
}

执行心跳操作:

代码语言:javascript复制
// mq锁心跳发送时间间隔
public int getMqLockHeartBeatTime() {
   try {
      if (!_getMqLockHeartBeatTime
            .equals(env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue))) {
         _getMqLockHeartBeatTime = env.getProperty(env_getMqLockHeartBeatTime_key,
               env_getMqLockHeartBeatTime_defaultValue);
         getMqLockHeartBeatTime = Integer.parseInt(
               env.getProperty(env_getMqLockHeartBeatTime_key, env_getMqLockHeartBeatTime_defaultValue));
         if (getMqLockHeartBeatTime < 15) {
            getMqLockHeartBeatTime = 15;
         }
         onChange();
      }
   } catch (Exception e) {
      getMqLockHeartBeatTime = 15;
      onChange();
      log.error("getgetMqLockHeartBeatTime_SoaConfig_error", e);
   }
   return getMqLockHeartBeatTime;

}

执行onChange操作:而onChange就是启动服务操作,根据key来启动服务

代码语言:javascript复制
//执行onChange方法
private void onChange() {
   executor.execute(() -> {
      for (Runnable runnable : changed.keySet()) {
         try {
            runnable.run();
         } catch (Exception e) {
            log.error("onchange-error", e);
         }
      }
   });
}

同时会将服务线程放入map中:

代码语言:javascript复制
public void registerChanged(Runnable runnable) {
   changed.put(runnable, true);
}

启动重平衡操作:重平衡启动核心,里面涉及到重要操作:

代码语言:javascript复制
initNotifyMessageStatId(); 初始化通知消息statId
initRbData(consumerGroupEntities, consumerGroupMap); 初始化重平衡数据
consumerGroupService.rb(t1.queueOffsets); 重平衡
addRbCompleteLog(t1); 添加重平衡日志
updateNotifyMessageId(currentMaxId); 更新通知消息id

执行启动:重平衡

代码语言:javascript复制
//执行启动
public void doStart() {
    //如果是重平衡
    if (!soaConfig.isEnableRb()) {
        return;
    }
    //是否是master
    if (lastMaster != isMaster()) {
        //检查notifyMessageStatId,初始化notifyMessageStatId
        if (!checkNotifyMessageStatId()) {
            initNotifyMessageStatId();
        }
        //master
        lastMaster=isMaster();
    }
    //获取通知消息id
    long currentMaxId = getNotifyMessageId();
    if (currentMaxId == 0) {
        return;
    }
    //消费组实体列表
    List<ConsumerGroupEntity> consumerGroupEntities = consumerGroupService
        .getLastRbConsumerGroup(lastNotifyMessageId, currentMaxId);
    if (CollectionUtils.isEmpty(consumerGroupEntities)) {
        return;
    }
    Map<Long, ConsumerGroupQuqueVo> consumerGroupMap = new HashMap<>();
    //执行初始化重平衡数据 重要
    initRbData(consumerGroupEntities, consumerGroupMap);
    for (ConsumerGroupQuqueVo t1 : consumerGroupMap.values()) {
        rb(t1);
        for (int i = 0; i < 3; i  ) {
            try {
                //如果是master,则执行重平衡操作,重要执行rb操作
                if (isMaster()) {
                    consumerGroupService.rb(t1.queueOffsets);
                }
                break;
            } catch (Exception e) {
                log.error("doCheckRebalance_error", e);
                Util.sleep(5000);
            }
        }
        //添加重平衡日志
        addRbCompleteLog(t1);
    }
    //更新通知消息id
    updateNotifyMessageId(currentMaxId);
    //减少不活跃的消费者 计数
    int count = consumerGroupConsumerService.deleteUnActiveConsumer();
    if (count > 0) {
        log.info("consumerGroupConsumer_empty,count is "   count);
    }
}

除了这个之外,具体的调用重平衡的操作,执行rb操作,类似于下面的代码带rb重平衡的代码:

代码语言:javascript复制
//执行rb重平衡操作
@Override
// @Transactional(rollbackFor = Exception.class)
public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
   Map<Long, String> idsMap = new HashMap<>(30);
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
   //将传入的查询偏移量的消费组id放入,并执行更新操作
   queueOffsetEntities.forEach(t1 -> {
      idsMap.put(t1.getConsumerGroupId(), "");
      NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
      notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
      notifyMessageEntity.setMessageType(MessageType.Meta);
      notifyMessageEntities.add(notifyMessageEntity);
      // 更新consumerid 和consumername
      queueOffsetService.updateConsumerId(t1);
   });
   // 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
   updateRbVersion(new ArrayList<>(idsMap.keySet()));
   // 批量插入消息事件
   notifyMessageService.insertBatch(notifyMessageEntities);

}

或者:

代码语言:javascript复制
// @Transactional(rollbackFor = Exception.class)
@Override
public void notifyRb(long id) {
   updateRbVersion(Arrays.asList(id));
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
   NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Rb);
   notifyMessageEntities.add(notifyMessageEntity);

   notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Meta);
   notifyMessageEntities.add(notifyMessageEntity);
   notifyMessageService.insertBatch(notifyMessageEntities);
}

执行重平衡操作的代码在ConsumerServiceImpl和ConsumerGroupServiceImpl中都有体现。这可以从消费者、消费组里面看到。

类似的,可以分析db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等。

0 人点赞