系列文章链接
- jetcd实战之一:极速体验
- jetcd实战之二:基本操作
- jetcd实战之三:进阶操作(事务、监听、租约)
本篇概览
- 本篇是《jetcd实战系列》的第三篇,前面熟悉了jetcd的基本操作,今天就来了解jetcd如何使用etcd的更多特性,本篇由以下几部分组成:
- 新建模块:在《jetcd实战系列》的jetcd-tutorials项目中新建名为advanced-operate的模块,本篇的源码都写在这个模块中;
- 事务:用jetcd实现事务,将多个操作在同一个事务中完成;
- 监听:对指定key的相关事件进行监听;
- 租约:对指定key绑定一个租约,需要不停的续租才能保证该key有效;
源码下载
- 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
- 这个git项目中有多个文件夹,kubebuilder相关的应用在jetcd-tutorials文件夹下,如下图红框所示:
- jetcd-tutorials文件夹下有多个子项目,本篇的是advanced-operate:
新建advanced-operate模块
- 对于监听和租约这两个特性,用单元测试的方式难以完成,因此今天的代码会用SpringBoot框架运行起来,然后咱们通过web页面和控制台命令来验证相关功能;
- 在jetcd-tutorials项目中新建名为advanced-operate的Gradle模块,其build.gradle内容如下,可见插件用的是org.springframework.boot,还把上一篇文章中的base-operate工程拿来做二方库了:
plugins {
id 'org.springframework.boot'
}
// 用了插件org.springframework.boot之后,jar task会失效,可用bootJar取代
bootJar {
archiveBaseName = project.name
archiveVersion = project.version
manifest {
attributes(
'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
'Built-By': 'travis',
'Build-Date': buildDate,
'Build-Time': buildTime,
'Built-OS': "${System.properties['os.name']}",
'Specification-Title': project.name,
'Specification-Version': projectVersion,
'Specification-Vendor': 'Will Zhao',
'Implementation-Title': project.name,
'Implementation-Version': projectVersion,
'Implementation-Vendor': 'Will Zhao'
)
}
}
// 子模块自己的依赖
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
// 二方库依赖
implementation project(':base-operate')
// annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
implementation 'commons-collections:commons-collections'
implementation 'org.apache.commons:commons-lang3'
testImplementation('org.junit.jupiter:junit-jupiter')
}
test {
useJUnitPlatform()
}
- 新增SpringBoot的启动类AdvancedOperateApplication,这里面定义了两个bean:etcdService和advancedEtcdService,前者是上一篇文章中的服务类,后者稍后就来创建 :
package com.bolingcavalry;
import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import com.bolingcavalry.dao.impl.AdvancedEtcdServiceImpl;
import com.bolingcavalry.dao.impl.EtcdServiceImpl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@SpringBootApplication
public class AdvancedOperateApplication {
private static final String IP = "192.168.133.218";
public static final String endpoints = "http://" IP ":2379,http://" IP ":2380,http://" IP ":2381";
@Bean
public EtcdService getEtcdService(){
return new EtcdServiceImpl(endpoints);
}
@Bean
public AdvancedEtcdService getAdvancedEtcdService(){
return new AdvancedEtcdServiceImpl(endpoints);
}
public static void main(String[] args) {
SpringApplication.run(AdvancedOperateApplication.class, args);
}
}
事务
- 新建好模块后,首先体验的是etcd的事务特性,我这里选择用来展示事务的例子是CAS(Compare And Set),即用jetcd来实现CAS;
- 关于CAS有个经典实现,就是AtomicInteger类的compareAndSet方法,官方文档如下图,地址是:https://docs.oracle.com/javase/8/docs/api/
- 接下来开始编码,新建接口AdvancedEtcdService,本篇所需的服务都来自这里的定义,先定义cas接口如下:
package com.bolingcavalry.dao;
import io.etcd.jetcd.Watch;
/**
* @Description: Etcd高级操作的服务接口
* @author: willzhao E-mail: zq2599@gmail.com
* @date: 2021/4/4 8:21
*/
public interface AdvancedEtcdService {
/**
* 乐观锁,指定key的当前值如果等于expectValue,就设置成updateValue
* @param key 键
* @param expectValue 期望值
* @param updateValue 达到期望值时要设置的值
*/
boolean cas(String key, String expectValue, String updateValue) throws Exception;
/**
* 关闭,释放资源
*/
void close();
}
- AdvancedEtcdService的实现类AdvancedEtcdServiceImpl,重点关注cas方法的实现,txn方法表示事务开始,Cmp对象封装了做比较的逻辑,接下来的If、Then等方法规定了比较成功后的逻辑(您还可以选择使用Else方法),最后用commit方法将操作一次性提交:
package com.bolingcavalry.dao.impl;
import com.bolingcavalry.dao.AdvancedEtcdService;
import io.etcd.jetcd.*;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.CallStreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Charsets.UTF_8;
/**
* @Description: Etcd高级操作的服务接口实现
* @author: willzhao E-mail: zq2599@gmail.com
* @date: 2021/4/4 8:23
*/
@Slf4j
public class AdvancedEtcdServiceImpl implements AdvancedEtcdService {
private Client client;
private Object lock = new Object();
private String endpoints;
public AdvancedEtcdServiceImpl(String endpoints) {
super();
this.endpoints = endpoints;
}
/**
* 将字符串转为客户端所需的ByteSequence实例
* @param val
* @return
*/
public static ByteSequence bytesOf(String val) {
return ByteSequence.from(val, UTF_8);
}
private Client getClient() {
if (null==client) {
synchronized (lock) {
if (null==client) {
client = Client.builder().endpoints(endpoints.split(",")).build();
}
}
}
return client;
}
@Override
public boolean cas(String key, String expectValue, String updateValue) throws Exception {
// 将三个String型的入参全部转成ByteSequence类型
ByteSequence bsKey = bytesOf(key);
ByteSequence bsExpectValue = bytesOf(expectValue);
ByteSequence bsUpdateValue = bytesOf(updateValue);
// 是否相等的比较
Cmp cmp = new Cmp(bsKey, Cmp.Op.EQUAL, CmpTarget.value(bsExpectValue));
// 执行事务
TxnResponse txnResponse = getClient().getKVClient()
.txn()
.If(cmp)
.Then(Op.put(bsKey, bsUpdateValue, PutOption.DEFAULT))
.commit()
.get();
// 如果操作成功,isSucceeded方法会返回true,并且PutResponse也有内容
return txnResponse.isSucceeded() && CollectionUtils.isNotEmpty(txnResponse.getPutResponses());
}
@Override
public void close() {
getClient().close();
client = null;
}
}
- 验证上述cas功能的方法并不复杂,用单元测试即可完成,新增单元测试类AdvancedEtcdServiceImplTest,里面的cas方法就是测试方法:
package com.bolingcavalry.dao.impl;
import com.bolingcavalry.AdvancedOperateApplication;
import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import static org.junit.jupiter.api.Assertions.*;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class AdvancedEtcdServiceImplTest {
// 为了便于维护,不轮是单元测试还是springboot应用,endpoint都来自AdvancedOperateApplication.endpoints这个变量
private static final String endpoints = AdvancedOperateApplication.endpoints;
private static EtcdService etcdService = new EtcdServiceImpl(endpoints);
private static AdvancedEtcdService advancedEtcdService = new AdvancedEtcdServiceImpl(endpoints);
private static String key(String name) {
return "/AdvancedEtcdServiceImplTest/" name "-" System.currentTimeMillis();
}
@AfterAll
static void close() {
etcdService.close();
advancedEtcdService.close();
}
@Test
void cas() throws Exception {
// 本次要操作的键
String key = key("cas");
// 初始值,在cas操作之前的值
String firstValue = "aaa";
// 第一次cas操作时要写入的值
String secondValue = "bbb";
// 第二次cas操作时要写入的值
String thirdValue = "ccc";
// cas操作前,将值写为"aaa"
etcdService.put(key, firstValue);
// 第一次cas操作,查出的值如果等于"aaa",就将其改为"bbb"
// 此时因为值等于"aaa",所以cas操作成功,值被改为"bbb"
boolean casRlt = advancedEtcdService.cas(key, firstValue, secondValue);
// 更新成功返回true
assertTrue(casRlt);
// 通过查询来验证值已经更新为"bbb"
assertEquals(secondValue, etcdService.getSingle(key));
// 第二次case操作,查出的值如果等于"aaa",就将其改为"bbb"
// 此时因为值等于"bbb",和期望值"aaa"不想等,因此cas失败,没有发生任何写操作,值还是"bbb"
casRlt = advancedEtcdService.cas(key, firstValue, thirdValue);
// cas失败就会返回false
assertFalse(casRlt);
// 确认最新的值还是上次更新的"bbb",没有被更新为"ccc"
assertEquals(secondValue, etcdService.getSingle(key));
}
}
- 上述测试代码一共有三步:首先写入初值aaa,然后用cas将aaa更新为bbb,最后再尝试一次cas,不过这时候的期望值还是aaa,自然是无法通过对比的,因此第二次cas失败,值还是bbb;
- 操作如下,可见单元测试通过:
监听
- 接下来学习jetcd提供的监听能力,这个功能的体验需要手动操作,因此单元测试就不合适了,咱们用web接口和控制台命令行结合的方式来操作;
- 在AdvancedEtcdService.java中新增watch方法的定义,入参是指定的key,以及调用方定制的监听实现,返回值Watcher是jetcd对监听事件的封装,调用方可以用Watcher来结束监听:
/**
* 为指定key添加监听
* @param key 键
* @param listener 监听事件
* @return jetcd对应的监听对象
* @throws Exception
*/
Watch.Watcher watch(String key, Watch.Listener listener) throws Exception;
- 在AdvancedEtcdServiceImpl.java中新增watch方法:
@Override
public Watch.Watcher watch(String key, Watch.Listener listener) throws Exception {
return getClient().getWatchClient().watch(bytesOf(key), listener);
}
- 新增一个web接口,该接口收到key,然后调用AdvancedEtcdServiceImpl.watch方法对key进行监听,完整的代码如下,可见watcherMap中保存了监听对象,业务方可以随时从中取出来做关闭操作:
package com.bolingcavalry.controller;
import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static java.nio.charset.StandardCharsets.UTF_8;
@RestController
@Slf4j
public class WatchController {
@Autowired
EtcdService etcdService;
@Autowired
AdvancedEtcdService advancedEtcdService;
private Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();
@RequestMapping(value = "/watch/{key}", method = RequestMethod.GET)
public String watch(@PathVariable("key") String key) throws Exception {
// 先检查指定的key在etcd中是否存在
// 查询条件中指定只返回key
GetOption getOption = GetOption.newBuilder().withCountOnly(true).build();
// 如果数量小于1,表示指定的key在etcd中不存在
if (etcdService.getRange(key, getOption).getCount()<1) {
String errorDesc = String.format("[%s] not exists", key);
log.error(errorDesc);
return errorDesc " " new Date();
}
final String watchKey = key;
// 实例化一个监听对象,当监听的key发生变化时会被调用
Watch.Listener listener = Watch.listener(watchResponse -> {
log.info("收到[{}]的事件", watchKey);
// 被调用时传入的是事件集合,这里遍历每个事件
watchResponse.getEvents().forEach(watchEvent -> {
// 操作类型
WatchEvent.EventType eventType = watchEvent.getEventType();
// 操作的键值对
KeyValue keyValue = watchEvent.getKeyValue();
log.info("type={}, key={}, value={}",
eventType,
keyValue.getKey().toString(UTF_8),
keyValue.getValue().toString(UTF_8));
// 如果是删除操作,就把该key的Watcher找出来close掉
if (WatchEvent.EventType.DELETE.equals(eventType)
&& watcherMap.containsKey(watchKey)) {
Watch.Watcher watcher = watcherMap.remove(watchKey);
watcher.close();
}
});
});
// 添加监听
Watch.Watcher watcher = advancedEtcdService.watch(watchKey, listener);
// 将这个Watcher放入内存中保存,如果该key被删除就要将这个Watcher关闭
watcherMap.put(key, watcher);
return "watch success " new Date();
}
}
- 代码中已经添加了详细的注释,这里就不多解释了,把这个springboot应用运行起来试试,在浏览器访问http://localhost:8080/watch/abc,其中abc是个etcd中不存在的key,此时会收到提示说key不存在:
- 在控制台用命令行新增abc这个key,我这里etcd是部署在docker上的,参考命令如下:
docker exec 27_etcd1_1 /usr/local/bin/etcdctl put abc 111
- 再次访问http://localhost:8080/watch/abc,提示操作成功:
- 在控制台对abc先做一次修改,再删除:
[root@centos7 ~]# docker exec 27_etcd1_1 /usr/local/bin/etcdctl put abc 222
OK
[root@centos7 ~]# docker exec 27_etcd1_1 /usr/local/bin/etcdctl del abc
1
- 此时在springboot的控制台页面,可见监听的方法被执行了,修改和删除的具体内容都被打印出来:
租约
- 接下来学习租约特性,这是个很实用的功能,如果应用A负责维护某个key的续租,当应用A出现问题无法续租时则该key就会过期,这样其他应用只要检查该key是否存在就知道应用A是否正常了,这就相当于简易的服务注册中心功能了;
- jetcd的租约操作很容易实现,基本步骤是先创建一个租约并指定TTL(time-to-live),创建租约成功后可以拿到租约ID,对指定的key做put操作时带上这个租约ID,这样这个key就会在TTL后过期,只有及时续租才能保证key不过期,续租有两种:无限自动和一次性:
- 接下来编码体验租约功能,先在AdvancedEtcdService.java中增加方法定义:
/**
* 带无限续租的写操作
* @param key 键
* @param value 值
* @throws Exception
*/
void putWithLease(String key, String value) throws Exception;
- 接下来,在AdvancedEtcdServiceImpl.java中实现该方法,可见keepAlive的入参需要指定一个回调用来完成续租成功后的操作,该回调中的方法有点多:
@Override
public void putWithLease(String key, String value) throws Exception {
AtomicInteger a;
Lease leaseClient = getClient().getLeaseClient();
leaseClient.grant(60)
.thenAccept(result -> {
// 租约ID
long leaseId = result.getID();
log.info("[{}]申请租约成功,租约ID [{}]", key, Long.toHexString(leaseId));
// 准备好put操作的client
KV kvClient = getClient().getKVClient();
// put操作时的可选项,在这里指定租约ID
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
// put操作
kvClient.put(bytesOf(key), bytesOf(value), putOption)
.thenAccept(putResponse -> {
// put操作完成后,再设置无限续租的操作
leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {
@Override
public boolean isReady() {
return false;
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
}
@Override
public void disableAutoInboundFlowControl() {
}
@Override
public void request(int count) {
}
@Override
public void setMessageCompression(boolean enable) {
}
/**
* 每次续租操作完成后,该方法都会被调用
* @param value
*/
@Override
public void onNext(LeaseKeepAliveResponse value) {
log.info("[{}]续租完成,TTL[{}]", Long.toHexString(leaseId), value.getTTL());
}
@Override
public void onError(Throwable t) {
log.error("onError", t);
}
@Override
public void onCompleted() {
log.info("onCompleted");
}
});
});
});
}
- 开发web接口,收到请求后调用上面的租约服务,为指定的key做带有租约的put操作:
package com.bolingcavalry.controller;
import com.bolingcavalry.dao.AdvancedEtcdService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
@Slf4j
public class LeaseController {
@Autowired
AdvancedEtcdService advancedEtcdService;
@RequestMapping(value = "/lease/{key}/{value}", method = RequestMethod.GET)
public String lease(@PathVariable("key") String key, @PathVariable("value") String value) throws Exception {
advancedEtcdService.putWithLease(key, value);
return "lease success " new Date();
}
}
- 编码完成后将应用启动,浏览器访问http://localhost:8080/lease/bbb/123,即可为bbb这个key创建无限自动续租的租约;
- 在springboot应用的控制台上可见续租成功的日志:
2021-04-05 13:13:59.490 INFO 16472 --- [pool-1-thread-2] c.b.dao.impl.AdvancedEtcdServiceImpl : [bbb]申请租约成功,租约ID [5853789fddbff86e]
2021-04-05 13:13:59.930 INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl : [5853789fddbff86e]续租完成,TTL[60]
2021-04-05 13:14:15.930 INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl : [5853789fddbff86b]续租完成,TTL[60]
2021-04-05 13:14:20.430 INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl : [5853789fddbff86e]续租完成,TTL[60]
2021-04-05 13:14:36.434 INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl : [5853789fddbff86b]续租完成,TTL[60]
2021-04-05 13:14:40.930 INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl : [5853789fddbff86e]续租完成,TTL[60]
- 从上述日志可见,租约ID是5853789fddbff86e,用以下命令查看租约的最新情况:
docker exec 27_etcd1_1 /usr/local/bin/etcdctl lease timetolive 5853789fddbff86e --keys
- 此时停掉springboot进程,再用上述命令查看租约信息,如下图,可见没有续租后,key的有效时间不会再延长,超过60秒后该key无效:
- 至此,通过jetcd使用etcd事务、监听、租约等功能的操作就全部完成了,如果您正在使用这些功能,希望本文能给您一些参考;