jetcd实战之三:进阶操作(事务、监听、租约)

2021-12-07 08:21:11 浏览数 (1)

系列文章链接

  1. jetcd实战之一:极速体验
  2. jetcd实战之二:基本操作
  3. jetcd实战之三:进阶操作(事务、监听、租约)

本篇概览

  • 本篇是《jetcd实战系列》的第三篇,前面熟悉了jetcd的基本操作,今天就来了解jetcd如何使用etcd的更多特性,本篇由以下几部分组成:
  1. 新建模块:在《jetcd实战系列》的jetcd-tutorials项目中新建名为advanced-operate的模块,本篇的源码都写在这个模块中;
  2. 事务:用jetcd实现事务,将多个操作在同一个事务中完成;
  3. 监听:对指定key的相关事件进行监听;
  4. 租约:对指定key绑定一个租约,需要不停的续租才能保证该key有效;

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称

链接

备注

项目主页

https://github.com/zq2599/blog_demos

该项目在GitHub上的主页

git仓库地址(https)

https://github.com/zq2599/blog_demos.git

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

  • 这个git项目中有多个文件夹,kubebuilder相关的应用在jetcd-tutorials文件夹下,如下图红框所示:
  • jetcd-tutorials文件夹下有多个子项目,本篇的是advanced-operate:

新建advanced-operate模块

  • 对于监听和租约这两个特性,用单元测试的方式难以完成,因此今天的代码会用SpringBoot框架运行起来,然后咱们通过web页面和控制台命令来验证相关功能;
  • 在jetcd-tutorials项目中新建名为advanced-operate的Gradle模块,其build.gradle内容如下,可见插件用的是org.springframework.boot,还把上一篇文章中的base-operate工程拿来做二方库了:
代码语言:javascript复制
plugins {
    id 'org.springframework.boot'
}

// 用了插件org.springframework.boot之后,jar task会失效,可用bootJar取代
bootJar {
    archiveBaseName = project.name
    archiveVersion = project.version

    manifest {
        attributes(
                'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
                'Built-By': 'travis',
                'Build-Date': buildDate,
                'Build-Time': buildTime,
                'Built-OS': "${System.properties['os.name']}",
                'Specification-Title': project.name,
                'Specification-Version': projectVersion,
                'Specification-Vendor': 'Will Zhao',
                'Implementation-Title': project.name,
                'Implementation-Version': projectVersion,
                'Implementation-Vendor': 'Will Zhao'
        )
    }
}

// 子模块自己的依赖
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    // 二方库依赖
    implementation project(':base-operate')
    // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
    implementation 'commons-collections:commons-collections'
    implementation 'org.apache.commons:commons-lang3'
    testImplementation('org.junit.jupiter:junit-jupiter')
}

test {
    useJUnitPlatform()
}
  • 新增SpringBoot的启动类AdvancedOperateApplication,这里面定义了两个bean:etcdService和advancedEtcdService,前者是上一篇文章中的服务类,后者稍后就来创建 :
代码语言:javascript复制
package com.bolingcavalry;

import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import com.bolingcavalry.dao.impl.AdvancedEtcdServiceImpl;
import com.bolingcavalry.dao.impl.EtcdServiceImpl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@SpringBootApplication
public class AdvancedOperateApplication {

    private static final String IP = "192.168.133.218";

    public static final String endpoints = "http://"   IP   ":2379,http://"   IP   ":2380,http://"   IP   ":2381";


    @Bean
    public EtcdService getEtcdService(){
        return new EtcdServiceImpl(endpoints);
    }

    @Bean
    public AdvancedEtcdService getAdvancedEtcdService(){
        return new AdvancedEtcdServiceImpl(endpoints);
    }

    public static void main(String[] args) {
        SpringApplication.run(AdvancedOperateApplication.class, args);
    }
}

事务

  • 新建好模块后,首先体验的是etcd的事务特性,我这里选择用来展示事务的例子是CAS(Compare And Set),即用jetcd来实现CAS;
  • 关于CAS有个经典实现,就是AtomicInteger类的compareAndSet方法,官方文档如下图,地址是:https://docs.oracle.com/javase/8/docs/api/
  • 接下来开始编码,新建接口AdvancedEtcdService,本篇所需的服务都来自这里的定义,先定义cas接口如下:
代码语言:javascript复制
package com.bolingcavalry.dao;

import io.etcd.jetcd.Watch;

/**
 * @Description: Etcd高级操作的服务接口
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2021/4/4 8:21
 */
public interface AdvancedEtcdService {

    /**
     * 乐观锁,指定key的当前值如果等于expectValue,就设置成updateValue
     * @param key           键
     * @param expectValue   期望值
     * @param updateValue   达到期望值时要设置的值
     */
    boolean cas(String key, String expectValue, String updateValue) throws Exception;
    
    /**
     * 关闭,释放资源
     */
    void close();
}
  • AdvancedEtcdService的实现类AdvancedEtcdServiceImpl,重点关注cas方法的实现,txn方法表示事务开始,Cmp对象封装了做比较的逻辑,接下来的If、Then等方法规定了比较成功后的逻辑(您还可以选择使用Else方法),最后用commit方法将操作一次性提交:
代码语言:javascript复制
package com.bolingcavalry.dao.impl;

import com.bolingcavalry.dao.AdvancedEtcdService;
import io.etcd.jetcd.*;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.PutOption;
import io.grpc.stub.CallStreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.base.Charsets.UTF_8;

/**
 * @Description: Etcd高级操作的服务接口实现
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2021/4/4 8:23
 */
@Slf4j
public class AdvancedEtcdServiceImpl implements AdvancedEtcdService {

    private Client client;

    private Object lock = new Object();

    private String endpoints;

    public AdvancedEtcdServiceImpl(String endpoints) {
        super();
        this.endpoints = endpoints;
    }

    /**
     * 将字符串转为客户端所需的ByteSequence实例
     * @param val
     * @return
     */
    public static ByteSequence bytesOf(String val) {
        return ByteSequence.from(val, UTF_8);
    }

    private Client getClient() {
        if (null==client) {
            synchronized (lock) {
                if (null==client) {
                    client = Client.builder().endpoints(endpoints.split(",")).build();
                }
            }
        }

        return client;
    }

    @Override
    public boolean cas(String key, String expectValue, String updateValue) throws Exception {
        // 将三个String型的入参全部转成ByteSequence类型
        ByteSequence bsKey = bytesOf(key);
        ByteSequence bsExpectValue = bytesOf(expectValue);
        ByteSequence bsUpdateValue = bytesOf(updateValue);

        // 是否相等的比较
        Cmp cmp = new Cmp(bsKey, Cmp.Op.EQUAL, CmpTarget.value(bsExpectValue));

        // 执行事务
        TxnResponse txnResponse = getClient().getKVClient()
                                .txn()
                                .If(cmp)
                                .Then(Op.put(bsKey, bsUpdateValue, PutOption.DEFAULT))
                                .commit()
                                .get();

        // 如果操作成功,isSucceeded方法会返回true,并且PutResponse也有内容
        return txnResponse.isSucceeded() && CollectionUtils.isNotEmpty(txnResponse.getPutResponses());
    }

    @Override
    public void close() {
        getClient().close();
        client = null;
    }
}
  • 验证上述cas功能的方法并不复杂,用单元测试即可完成,新增单元测试类AdvancedEtcdServiceImplTest,里面的cas方法就是测试方法:
代码语言:javascript复制
package com.bolingcavalry.dao.impl;

import com.bolingcavalry.AdvancedOperateApplication;
import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import static org.junit.jupiter.api.Assertions.*;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class AdvancedEtcdServiceImplTest {

    // 为了便于维护,不轮是单元测试还是springboot应用,endpoint都来自AdvancedOperateApplication.endpoints这个变量
    private static final String endpoints = AdvancedOperateApplication.endpoints;

    private static EtcdService etcdService = new EtcdServiceImpl(endpoints);

    private static AdvancedEtcdService advancedEtcdService = new AdvancedEtcdServiceImpl(endpoints);

    private static String key(String name) {
        return "/AdvancedEtcdServiceImplTest/"   name   "-"   System.currentTimeMillis();
    }

    @AfterAll
    static void close() {
        etcdService.close();
        advancedEtcdService.close();
    }

    @Test
    void cas() throws Exception {
        // 本次要操作的键
        String key = key("cas");

        // 初始值,在cas操作之前的值
        String firstValue = "aaa";

        // 第一次cas操作时要写入的值
        String secondValue = "bbb";

        // 第二次cas操作时要写入的值
        String thirdValue = "ccc";

        // cas操作前,将值写为"aaa"
        etcdService.put(key, firstValue);

        // 第一次cas操作,查出的值如果等于"aaa",就将其改为"bbb"
        // 此时因为值等于"aaa",所以cas操作成功,值被改为"bbb"
        boolean casRlt = advancedEtcdService.cas(key, firstValue, secondValue);

        // 更新成功返回true
        assertTrue(casRlt);

        // 通过查询来验证值已经更新为"bbb"
        assertEquals(secondValue, etcdService.getSingle(key));

        // 第二次case操作,查出的值如果等于"aaa",就将其改为"bbb"
        // 此时因为值等于"bbb",和期望值"aaa"不想等,因此cas失败,没有发生任何写操作,值还是"bbb"
        casRlt = advancedEtcdService.cas(key, firstValue, thirdValue);

        // cas失败就会返回false
        assertFalse(casRlt);

        // 确认最新的值还是上次更新的"bbb",没有被更新为"ccc"
        assertEquals(secondValue, etcdService.getSingle(key));
    }
}
  • 上述测试代码一共有三步:首先写入初值aaa,然后用cas将aaa更新为bbb,最后再尝试一次cas,不过这时候的期望值还是aaa,自然是无法通过对比的,因此第二次cas失败,值还是bbb;
  • 操作如下,可见单元测试通过:

监听

  • 接下来学习jetcd提供的监听能力,这个功能的体验需要手动操作,因此单元测试就不合适了,咱们用web接口和控制台命令行结合的方式来操作;
  • 在AdvancedEtcdService.java中新增watch方法的定义,入参是指定的key,以及调用方定制的监听实现,返回值Watcher是jetcd对监听事件的封装,调用方可以用Watcher来结束监听:
代码语言:javascript复制
    /**
     * 为指定key添加监听
     * @param key       键
     * @param listener  监听事件
     * @return          jetcd对应的监听对象
     * @throws Exception
     */
    Watch.Watcher watch(String key, Watch.Listener listener) throws Exception;
  • 在AdvancedEtcdServiceImpl.java中新增watch方法:
代码语言:javascript复制
    @Override
    public Watch.Watcher watch(String key, Watch.Listener listener) throws Exception {
        return getClient().getWatchClient().watch(bytesOf(key), listener);
    }
  • 新增一个web接口,该接口收到key,然后调用AdvancedEtcdServiceImpl.watch方法对key进行监听,完整的代码如下,可见watcherMap中保存了监听对象,业务方可以随时从中取出来做关闭操作:
代码语言:javascript复制
package com.bolingcavalry.controller;

import com.bolingcavalry.dao.AdvancedEtcdService;
import com.bolingcavalry.dao.EtcdService;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.watch.WatchEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static java.nio.charset.StandardCharsets.UTF_8;

@RestController
@Slf4j
public class WatchController {

    @Autowired
    EtcdService etcdService;

    @Autowired
    AdvancedEtcdService advancedEtcdService;

    private Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();

    @RequestMapping(value = "/watch/{key}", method = RequestMethod.GET)
    public String watch(@PathVariable("key") String key) throws Exception {
        // 先检查指定的key在etcd中是否存在

        // 查询条件中指定只返回key
        GetOption getOption = GetOption.newBuilder().withCountOnly(true).build();
        // 如果数量小于1,表示指定的key在etcd中不存在
        if (etcdService.getRange(key, getOption).getCount()<1) {
            String errorDesc = String.format("[%s] not exists", key);
            log.error(errorDesc);
            return errorDesc   " "   new Date();
        }

        final String watchKey = key;

        // 实例化一个监听对象,当监听的key发生变化时会被调用
        Watch.Listener listener = Watch.listener(watchResponse -> {
            log.info("收到[{}]的事件", watchKey);

            // 被调用时传入的是事件集合,这里遍历每个事件
            watchResponse.getEvents().forEach(watchEvent -> {
                // 操作类型
                WatchEvent.EventType eventType = watchEvent.getEventType();

                // 操作的键值对
                KeyValue keyValue = watchEvent.getKeyValue();

                log.info("type={}, key={}, value={}",
                            eventType,
                            keyValue.getKey().toString(UTF_8),
                            keyValue.getValue().toString(UTF_8));

                // 如果是删除操作,就把该key的Watcher找出来close掉
                if (WatchEvent.EventType.DELETE.equals(eventType)
                && watcherMap.containsKey(watchKey)) {
                    Watch.Watcher watcher = watcherMap.remove(watchKey);
                    watcher.close();
                }
            });
        });

        // 添加监听
        Watch.Watcher watcher = advancedEtcdService.watch(watchKey, listener);

        // 将这个Watcher放入内存中保存,如果该key被删除就要将这个Watcher关闭
        watcherMap.put(key, watcher);

        return "watch success "   new Date();
    }
}
  • 代码中已经添加了详细的注释,这里就不多解释了,把这个springboot应用运行起来试试,在浏览器访问http://localhost:8080/watch/abc,其中abc是个etcd中不存在的key,此时会收到提示说key不存在:
  • 在控制台用命令行新增abc这个key,我这里etcd是部署在docker上的,参考命令如下:
代码语言:javascript复制
docker exec 27_etcd1_1 /usr/local/bin/etcdctl put abc 111
  • 再次访问http://localhost:8080/watch/abc,提示操作成功:
  • 在控制台对abc先做一次修改,再删除:
代码语言:javascript复制
[root@centos7 ~]# docker exec 27_etcd1_1 /usr/local/bin/etcdctl put abc 222
OK
[root@centos7 ~]# docker exec 27_etcd1_1 /usr/local/bin/etcdctl del abc
1
  • 此时在springboot的控制台页面,可见监听的方法被执行了,修改和删除的具体内容都被打印出来:

租约

  • 接下来学习租约特性,这是个很实用的功能,如果应用A负责维护某个key的续租,当应用A出现问题无法续租时则该key就会过期,这样其他应用只要检查该key是否存在就知道应用A是否正常了,这就相当于简易的服务注册中心功能了;
  • jetcd的租约操作很容易实现,基本步骤是先创建一个租约并指定TTL(time-to-live),创建租约成功后可以拿到租约ID,对指定的key做put操作时带上这个租约ID,这样这个key就会在TTL后过期,只有及时续租才能保证key不过期,续租有两种:无限自动和一次性:
  • 接下来编码体验租约功能,先在AdvancedEtcdService.java中增加方法定义:
代码语言:javascript复制
    /**
     * 带无限续租的写操作
     * @param key   键
     * @param value 值
     * @throws Exception
     */
    void putWithLease(String key, String value) throws Exception;
  • 接下来,在AdvancedEtcdServiceImpl.java中实现该方法,可见keepAlive的入参需要指定一个回调用来完成续租成功后的操作,该回调中的方法有点多:
代码语言:javascript复制
@Override
    public void putWithLease(String key, String value) throws Exception {
        AtomicInteger a;
        Lease leaseClient = getClient().getLeaseClient();

        leaseClient.grant(60)
                .thenAccept(result -> {

                    // 租约ID
                    long leaseId = result.getID();

                    log.info("[{}]申请租约成功,租约ID [{}]", key, Long.toHexString(leaseId));

                    // 准备好put操作的client
                    KV kvClient = getClient().getKVClient();

                    // put操作时的可选项,在这里指定租约ID
                    PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();

                    // put操作
                    kvClient.put(bytesOf(key), bytesOf(value), putOption)
                            .thenAccept(putResponse -> {
                                // put操作完成后,再设置无限续租的操作
                                leaseClient.keepAlive(leaseId, new CallStreamObserver<LeaseKeepAliveResponse>() {
                                    @Override
                                    public boolean isReady() {
                                        return false;
                                    }

                                    @Override
                                    public void setOnReadyHandler(Runnable onReadyHandler) {

                                    }

                                    @Override
                                    public void disableAutoInboundFlowControl() {

                                    }

                                    @Override
                                    public void request(int count) {
                                    }

                                    @Override
                                    public void setMessageCompression(boolean enable) {

                                    }

                                    /**
                                     * 每次续租操作完成后,该方法都会被调用
                                     * @param value
                                     */
                                    @Override
                                    public void onNext(LeaseKeepAliveResponse value) {
                                        log.info("[{}]续租完成,TTL[{}]", Long.toHexString(leaseId), value.getTTL());
                                    }

                                    @Override
                                    public void onError(Throwable t) {
                                        log.error("onError", t);
                                    }

                                    @Override
                                    public void onCompleted() {
                                        log.info("onCompleted");
                                    }
                                });
                            });
                });
    }
  • 开发web接口,收到请求后调用上面的租约服务,为指定的key做带有租约的put操作:
代码语言:javascript复制
package com.bolingcavalry.controller;

import com.bolingcavalry.dao.AdvancedEtcdService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;

@RestController
@Slf4j
public class LeaseController {

    @Autowired
    AdvancedEtcdService advancedEtcdService;

    @RequestMapping(value = "/lease/{key}/{value}", method = RequestMethod.GET)
    public String lease(@PathVariable("key") String key, @PathVariable("value") String value) throws Exception {
        advancedEtcdService.putWithLease(key, value);
        return "lease success "   new Date();
    }
}
  • 编码完成后将应用启动,浏览器访问http://localhost:8080/lease/bbb/123,即可为bbb这个key创建无限自动续租的租约;
  • 在springboot应用的控制台上可见续租成功的日志:
代码语言:javascript复制
2021-04-05 13:13:59.490  INFO 16472 --- [pool-1-thread-2] c.b.dao.impl.AdvancedEtcdServiceImpl     : [bbb]申请租约成功,租约ID [5853789fddbff86e]
2021-04-05 13:13:59.930  INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl     : [5853789fddbff86e]续租完成,TTL[60]
2021-04-05 13:14:15.930  INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl     : [5853789fddbff86b]续租完成,TTL[60]
2021-04-05 13:14:20.430  INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl     : [5853789fddbff86e]续租完成,TTL[60]
2021-04-05 13:14:36.434  INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl     : [5853789fddbff86b]续租完成,TTL[60]
2021-04-05 13:14:40.930  INFO 16472 --- [ault-executor-0] c.b.dao.impl.AdvancedEtcdServiceImpl     : [5853789fddbff86e]续租完成,TTL[60]
  • 从上述日志可见,租约ID是5853789fddbff86e,用以下命令查看租约的最新情况:
代码语言:javascript复制
docker exec 27_etcd1_1 /usr/local/bin/etcdctl lease timetolive 5853789fddbff86e --keys
  • 此时停掉springboot进程,再用上述命令查看租约信息,如下图,可见没有续租后,key的有效时间不会再延长,超过60秒后该key无效:
  • 至此,通过jetcd使用etcd事务、监听、租约等功能的操作就全部完成了,如果您正在使用这些功能,希望本文能给您一些参考;

0 人点赞