浅谈基于 Zookeeper 实现分布式锁对 Maxwell 完成高可用

2022-05-17 16:09:06 浏览数 (1)

1. 背景

❝麦斯威尔CDC框架使用方法,但后来声称基于筏子的框架实现了很高的可用性,存在MySQL协议进行相关测试试验发现上的问题,然后还是通过性克隆这个框架,通过Zookeeper框架,完成对Maxwell的高可用。 ❞

2.原理

2.1.文字介绍

❝分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要「临时有序」的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.「变成子节点的时候」消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。

  • 应该是为了更好地服务于他人的陪伴
  • 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”

2.2. 图示介绍

3.代码实现

3.1.修改pom文件

代码语言:javascript复制
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.1</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.1</version>
</dependency>

3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数

代码语言:javascript复制
public static void main(String[] args) {
  try {
   Logging.setupLogBridging();
   MaxwellConfig config = new MaxwellConfig(args);

   if ( config.log_level != null ) {
    Logging.setLevel(config.log_level);
   }

   final Maxwell maxwell = new Maxwell(config);

   Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
     maxwell.terminate();
     StaticShutdownCallbackRegistry.invoke();
    }
   });

   LOGGER.info("Starting Maxwell. maxMemory: "   Runtime.getRuntime().maxMemory()   " bufferMemoryUsage: "   config.bufferMemoryUsage);

   /*
   if ( config.haMode ) {
    new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
   } else {
    maxwell.start();
   }
    */
   if ( config.haMode ) {
    CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
    curatorUtil.highAvailable();
   }
   maxwell.start();

  } catch ( SQLException e ) {
   // catch SQLException explicitly because we likely don't care about the stacktrace
   LOGGER.error("SQLException: "   e.getLocalizedMessage());
   System.exit(1);
  } catch ( URISyntaxException e ) {
   // catch URISyntaxException explicitly as well to provide more information to the user
   LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)");
   LOGGER.error("URISyntaxException: "   e.getLocalizedMessage());
   System.exit(1);
  } catch ( ServerException e ) {
   LOGGER.error("Maxwell couldn't find the requested binlog, exiting...");
   System.exit(2);
  } catch ( Exception e ) {
   e.printStackTrace();
   System.exit(1);
  }
 }

3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil

代码语言:javascript复制
package com.zendesk.maxwell.util;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorUtil {
 private String zookeeperServers;
 private int sessionTimeoutMs;
 private int connectionTimeoutMs;
 private int baseSleepTimeMs;
 private int maxRetries;
 private CuratorFramework client;
 private String lockPath = "/maxwell/ha/lock";
 private String leaderPath = "/maxwell/ha/leader";


 public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){
  this.zookeeperServers = zookeeperServers;
  this.sessionTimeoutMs = sessionTimeoutMs;
  this.connectionTimeoutMs = connectionTimeoutMs;
  this.baseSleepTimeMs = baseSleepTimeMs;
  this.maxRetries = maxRetries;
 }

 /*
  * 构造 zookeeper 客户端,并连接 zookeeper 集群
  */
 public void start(){
  ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
  client = CuratorFrameworkFactory.newClient(
    this.zookeeperServers,
    this.sessionTimeoutMs,
    this.connectionTimeoutMs,
    retryPolicy
  );
  client.start();
 }

 /*
  * 实现分布式锁
  */
 public void highAvailable(){
  // 1.连接 Zookeeper 客户端
  this.start();
  // 2.向 zookeeper 注册自己
  InterProcessMutex lock = new InterProcessMutex(client, lockPath);
  try {
   // 3.获取锁
   lock.acquire();
   // 4.将自己信息注册到 leader 路径
   client.create()
     .withMode(CreateMode.EPHEMERAL)
     .forPath(leaderPath);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

3.4.修改代码com.zendesk.maxwell.MaxwellConfig

代码语言:javascript复制
// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

// 函数 MaxwellOptionParser 新增代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
    .withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
    .withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
    .withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
    .withRequiredArg();

// 函数 setup 新增代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode){
 if (zookeeperServers == null){
  LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode");
 }
}

4.说明

❝需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。 ❞

0 人点赞