1. 背景
❝麦斯威尔CDC框架使用方法,但后来声称基于筏子的框架实现了很高的可用性,存在MySQL协议进行相关测试试验发现上的问题,然后还是通过性克隆这个框架,通过Zookeeper框架,完成对Maxwell的高可用。 ❞
2.原理
2.1.文字介绍
❝分布式服务通过在代码里约定的路径向动物园管理员中注册自己,注意这里注册需要「临时有序」的子节点,分布式服务根据自己注册完成的子节点的先后顺序,依次监听自己前置位的子等,当 1.「变成子节点的时候」消失,且 2. 自己为当前的 Zookeeper 路径下节点号的最小节点的时候,开启自己的服务端。
- 应该是为了更好地服务于他人的陪伴
- 临时的目的是为了当前设备由于停机机,能够从动物园管理员撤掉自己,给服务的“腾位置”
❞
2.2. 图示介绍
3.代码实现
3.1.修改pom文件
代码语言:javascript复制<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.1</version>
</dependency>
3.2.修改框架入口类com.zendesk.maxwell.Maxwell的主要函数
代码语言:javascript复制public static void main(String[] args) {
try {
Logging.setupLogBridging();
MaxwellConfig config = new MaxwellConfig(args);
if ( config.log_level != null ) {
Logging.setLevel(config.log_level);
}
final Maxwell maxwell = new Maxwell(config);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
maxwell.terminate();
StaticShutdownCallbackRegistry.invoke();
}
});
LOGGER.info("Starting Maxwell. maxMemory: " Runtime.getRuntime().maxMemory() " bufferMemoryUsage: " config.bufferMemoryUsage);
/*
if ( config.haMode ) {
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {
maxwell.start();
}
*/
if ( config.haMode ) {
CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
curatorUtil.highAvailable();
}
maxwell.start();
} catch ( SQLException e ) {
// catch SQLException explicitly because we likely don't care about the stacktrace
LOGGER.error("SQLException: " e.getLocalizedMessage());
System.exit(1);
} catch ( URISyntaxException e ) {
// catch URISyntaxException explicitly as well to provide more information to the user
LOGGER.error("Syntax issue with URI, check for misconfigured host, port, database, or JDBC options (see RFC 2396)");
LOGGER.error("URISyntaxException: " e.getLocalizedMessage());
System.exit(1);
} catch ( ServerException e ) {
LOGGER.error("Maxwell couldn't find the requested binlog, exiting...");
System.exit(2);
} catch ( Exception e ) {
e.printStackTrace();
System.exit(1);
}
}
3.3.新增代码 com.zendesk.maxwell.util.CuratorUtil
代码语言:javascript复制package com.zendesk.maxwell.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorUtil {
private String zookeeperServers;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int baseSleepTimeMs;
private int maxRetries;
private CuratorFramework client;
private String lockPath = "/maxwell/ha/lock";
private String leaderPath = "/maxwell/ha/leader";
public CuratorUtil(String zookeeperServers,int sessionTimeoutMs,int connectionTimeoutMs,int baseSleepTimeMs,int maxRetries){
this.zookeeperServers = zookeeperServers;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxRetries = maxRetries;
}
/*
* 构造 zookeeper 客户端,并连接 zookeeper 集群
*/
public void start(){
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
client = CuratorFrameworkFactory.newClient(
this.zookeeperServers,
this.sessionTimeoutMs,
this.connectionTimeoutMs,
retryPolicy
);
client.start();
}
/*
* 实现分布式锁
*/
public void highAvailable(){
// 1.连接 Zookeeper 客户端
this.start();
// 2.向 zookeeper 注册自己
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
// 3.获取锁
lock.acquire();
// 4.将自己信息注册到 leader 路径
client.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(leaderPath);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.4.修改代码com.zendesk.maxwell.MaxwellConfig
代码语言:javascript复制// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;
// 函数 MaxwellOptionParser 新增代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
.withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
.withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
.withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
.withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
.withRequiredArg();
// 函数 setup 新增代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode){
if (zookeeperServers == null){
LOGGER.warn("you must specify --zookeeper because you want to use maxwell in ha mode");
}
}
4.说明
❝需要修改源代码是基于 1.29.2 完成对源代码的相关版本,使用高版本,按照相同的步骤对源代码进行修改。 ❞