服务监测与redis序列化

2023-10-22 16:20:42 浏览数 (1)

随着服务器数量的增多,部署于不同服务器的服务数量也是稳步增长的,这时候监测每个服务是否正常运转就显得尤为重要。通过监测的结果,可以方便的知道有没有服务下线了,从而采取相应的解决策略。

服务监测的设计

我设计的监测策略如下:

所有服务监测相关的数据存于redis中,每隔5分钟向目标服务端口发送一个socket连接请求,如果连接成功了那么确认服务存活,如果出现异常判定服务异常下线。对于每个服务,可以设置是否启用下线报警(发送邮件通知),如果启用,将在发现下线时,向管理员的邮箱发送一封邮件告知,并且暂且关闭该服务的报警功能(因为设置的监测时间比较密集,如果不暂且关闭的话,一直发送邮件会很快塞满邮箱)。

存储的数据类型是redis的hash类型,对应的java类型为 Map<String, List<DetectedPort>> ,其中key是监测的服务器ip地址,value是一个list集合,该集合每个成员都是一个端口的相关信息(包括了端口号、服务名称、状态、是否警告)。当然因为可以对整个服务器进行监测,所以额外的增加一个hash类型的数据,对应的java类型为 Map<String, Boolean> ,其中key也是ip地址,value是该服务器是否在线。

对于服务监测功能,一共有这样几个api的实现:增加服务器、删除服务器、设置端口、删除端口、更新状态、查询状态。

具体的设计细节如下:

DetectedPort 类 :被监测的端口

代码语言:javascript复制
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DetectedPort {
    private Integer port;
    private String name;
    /**
     * 判断当前应用是否在线
     */
    private Boolean up;
    /**
     * 当服务下线时是否通知
     */
    private Boolean alert;
}

DetectedServer 类 : 被监测的服务器

代码语言:javascript复制
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DetectedServer {
    /**
     * 判断当前服务器是否在线
     */
    private String ip;
    private Boolean up;  // 一个服务器可以有多个端口号
    private List<DetectedPort> ports; 
}

DsapUtil 类 :监测工具类,用来监测服务和端口是否可以连接

代码语言:javascript复制
public class DsapUtil {
    private static final int TIMEOUT = 500;

    public static boolean serverDetect(String host){
        try {
            return InetAddress.getByName(host).isReachable(TIMEOUT);
        } catch (Exception e) {
            return false;
        }
    }
    public static boolean portDetect(String host, int port){
        Socket socket = null;
        try {
            socket = new Socket();
            socket.connect(new InetSocketAddress(host, port), TIMEOUT);
            return true;
        } catch (Exception e){
            return false;
        } finally {
            if (socket!=null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

对于实际的service服务,在设计上没有考虑线程安全问题,一方面是因为该服务检测相关的api的访问都是管理员身份鉴权通过才能被允许的,正常的操作流程是不会出现安全问题的,另一方面,嗯,因为懒(误打QAQ)。

DsapServiceImpl 类 :DetectedServerAndPort的缩写,提供了api所需要的6个服务实现

  • 只要是增删改类型的api,在返回前都进行一轮状态更新 updateAll()。
  • selectAll方法有两个,一个是private访问权限的 selectAll0(),实际的进行了redis数据库的查询,并且将处理后的数据返回,而提供给api访问的public访问权限的selectAll(),是先从redis缓存中取直接结果,没有取到的的话说明是第一次访问,走一遍updateAll后再一次从redis取结果就一定存在了。
  • addNewPort 方法,因为设计上的value是一个json字符串,无法直接插入一个端口,所以必要的先从redis中取出该ip的端口信息json字符串。然而设置的序列化方式是 Jackson2JsonRedisSerializer ,ops.get()得到的数据类型实际是 ArrayList<LinkedHashMap> ,通过类型修正得到真正的List<DetectedPort>类型集合(后文详细解释这里的处理流程),因为要保证端口的唯一性,而结果list是无法去重的,所以用list.removeIf方法先把已经存在的该port数据删除,然后再把当前新的数据加入进来。最后通过ops.put把该ip的value替换成新的结果。
  • addServer 方法,相比于addNewPort方法就比较简单了,如果发现添加了相同的,直接替换即可。
代码语言:javascript复制
@Service
public class DsapServiceImpl implements DsapService {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private AsyncService asyncService;

    private static final String DSAP_PORTS = "DSAP-PORT";
    private static final String DSAP_SERVER = "DSAP-SERVER";
    /**
     * dsap状态缓存
     */
    private static final String DSAP_CACHE = "DSAP-CACHE";

    /**
     * 添加一个端口
     * @param dp 端口信息
     * @param ip ip
     * @return 1:up;-1:down
     */
    @Override
    public RetResult<Integer> addNewPort(DetectedPort dp, String ip) {
        boolean pd = DsapUtil.portDetect(ip, dp.getPort());
        dp.setUp(pd);

        HashOperations<String, String, List<DetectedPort>> ops = redisTemplate.opsForHash();

        // 获取当前ip的端口列表
        List<DetectedPort> ports = ops.get(DSAP_PORTS, ip);

        // 添加当前新的端口到列表
        if (ports == null) ports = new ArrayList<>();
        ports = transfer(ports);
        ports.removeIf(port -> port.getPort().equals(dp.getPort()));
        ports.add(dp);

        // 设置到hash中
        ops.put(DSAP_PORTS, ip, ports);

        // 更新服务器装填
        HashOperations<String, String, Boolean> ops2 = redisTemplate.opsForHash();
        boolean sd = DsapUtil.serverDetect(ip);
        ops2.put(DSAP_SERVER, ip, sd);

        updateAll();
        return RetResult.success(pd?1:-1);
    }

    /**
     * 删除指定的端口,并且返回缓存的状态
     * @param ip ip
     * @param port port
     * @return ret
     */
    @Override
    public RetResult<DetectedServer> delPort(String ip, int port) {
        // 获取server装填
        HashOperations<String,String,Boolean> ops1 = redisTemplate.opsForHash();
        Boolean up = ops1.get(DSAP_SERVER, ip);
        if (up == null) return RetResult.fail(ip   "不存在的记录");

        HashOperations<String, String, List<DetectedPort>> ops = redisTemplate.opsForHash();
        List<DetectedPort> ports = ops.get(DSAP_PORTS, ip);

        // 删除指定监听的端口
        if (ports == null) ports = new ArrayList<>();
        ports = transfer(ports);
        ports.removeIf(_port -> _port.getPort() == port);
        ops.put(DSAP_PORTS, ip, ports);

        updateAll();
        return RetResult.success(new DetectedServer(ip, up, ports));
    }

    /**
     * 添加一个服务器
     * @param ip ip
     * @return 1:up; -1:down
     */
    @Override
    public RetResult<Integer> addServer(String ip) {
        HashOperations<String,String,Boolean> ops1 = redisTemplate.opsForHash();
        boolean up = DsapUtil.serverDetect(ip);
        ops1.put(DSAP_SERVER, ip, up);

        updateAll();
        return RetResult.success(up?1:-1);
    }

    @Override
    public RetResult<Integer> delServer(String ip) {
        HashOperations ops = redisTemplate.opsForHash();
        ops.delete(DSAP_SERVER, ip);
        ops.delete(DSAP_PORTS, ip);
        updateAll();
        return RetResult.success(1);
    }

    /**
     * 从缓存获取所有服务状态
     * @return ret
     */
    public String selectAll() {
        ValueOperations<String, String> cache = redisTemplate.opsForValue();
        String res = cache.get(DSAP_CACHE);
        if (res != null) return res;
        updateAll();
        return cache.get(DSAP_CACHE);
    }

    /**
     * 提供给 {@link #updateAll()} 使用的内部select方法
     * @return 结果
     */
    private List<DetectedServer> selectAll0() {
        HashOperations<String, String, List<DetectedPort>> ops = redisTemplate.opsForHash();
        Map<String, List<DetectedPort>> ports = ops.entries(DSAP_PORTS);

        HashOperations<String,String,Boolean> ops1 = redisTemplate.opsForHash();
        Map<String, Boolean> entries = ops1.entries(DSAP_SERVER);
        Iterator<Map.Entry<String, Boolean>> iterator = entries.entrySet().iterator();
        List<DetectedServer> servers = new ArrayList<>(entries.size());
        while (iterator.hasNext()){
            Map.Entry<String, Boolean> next = iterator.next();
            String ip = next.getKey();
            List<DetectedPort> tmp = ports.get(ip);
            if (tmp == null) tmp = new ArrayList<>();
            servers.add(new DetectedServer(ip,next.getValue(), tmp));
        }

        return servers;
    }

    /**
     * 定时任务定时的更新状态, 并且将结果写入cache, cache的结果是一个ret
     */
    @Override
    public void updateAll() {
        BoundHashOperations<String, String, Boolean> ops1 = redisTemplate.boundHashOps(DSAP_SERVER);
        BoundHashOperations<String, String, List<DetectedPort>> ops = redisTemplate.boundHashOps(DSAP_PORTS);
        ValueOperations<String, String> cache = redisTemplate.opsForValue();

        List<DetectedServer> data = selectAll0();
        Map<String, Boolean> serverStatus = new HashMap<>(data.size());
        Map<String, List<DetectedPort>> portsStatus = new HashMap<>();
        StringBuilder msg = new StringBuilder();
        for (DetectedServer server:data) {
            String ip = server.getIp();

            // 检测当前服务器up
            boolean serverUp = DsapUtil.serverDetect(ip);
            serverStatus.put(ip, serverUp);
            server.setUp(serverUp);

            // 检测每个端口的状态
            List<DetectedPort> tmp = new ArrayList<>(server.getPorts().size());
            server.setPorts(transfer(server.getPorts()));
            for (DetectedPort port : server.getPorts()) {
                boolean portUp = DsapUtil.portDetect(ip, port.getPort());
                port.setUp(portUp);
                // 如果服务下线并且需要通知, 那么进行一次通知, 并且关闭通知, 避免密集通知
                if (!portUp && port.getAlert()) {
                    // <b>ip:port name<b/><br/>
                    msg.append("<b>").append(ip).append(":").append(port.getPort()).append(" ").append(port.getName()).append("</b></br>");
                    port.setAlert(false);
                }
                tmp.add(port);
            }
            portsStatus.put(ip, tmp);
        }

        // 进行一批次的通知,异步发送邮件
        if (!"".equals(msg.toString())){
            asyncService.serviceDownAlert(msg.toString());
        }

        // 更新redis中数据的状态
        ops1.putAll(serverStatus);
        ops.putAll(portsStatus);
        // 将数据写入cache中
        JSONObject jsonObject = new JSONObject(RetResult.success(data));
        cache.set(DSAP_CACHE, jsonObject.toString(), 10, TimeUnit.MINUTES);
    }

    /**
     * 将redis的get得到的原始数据修正成正确格式的对象
     * @param raw 从redis的get出来的原始数据
     * @return 正确的格式数据
     */
    private List<DetectedPort> transfer(List<DetectedPort> raw){
        String tmp = JSONObject.valueToString(raw);
        Type type = new TypeToken<List<DetectedPort>>() { }.getType();
        return new Gson().fromJson(tmp, type);
    }
}

服务监测结果展示

首先看一下前端后台的效果图,可以发现加入了四台服务器,每个服务器有相应的端口监测,“√”/“x”表示是否开启该端口异常下线的邮件通知。

接着看一下redis数据库的结果:

首先是这次服务监测产生的三个key都正常存在了,然后其中DSAP-CACHE缓存的结果也正常(不过看着好多转义符号啊2333).

查看DSAP-SERVER与DSAP-PORT的kv,也都正常。

redis序列化

序列化器:

默认的,redis对对象的序列化方式是 JdkSerializationRedisSerializer ,这种方式的结果因为是二进制数据,不借用专门的功能难以查阅结果,虽然反序列化时候不需要类型信息,但是缺点也很明显:被序列化的类需要实现Serializable接口、结果占用空间比较大。

StringRedisSerializer,只适合字符串类型数据的序列化,通常作为各种key的序列化方式。

GenericToStringSerializer,比上面一个更加通用,可以传递类型后,生成字符串。

Jackson2JsonRedisSerializer,比较常用的一种对值的序列化方式,结果非常简洁,速度快,但是需要一个序列化对象的类型作为参数。

GenericJackson2JsonRedisSerializer,比上面一个更加通用,存储的结果中会保存类型信息,所以结果占用会大一些。

当然还有其它的序列化器,比如阿里的FastJson2JsonRedisSerializer,这里就不做过多介绍了。

redisTemplate:

对于springboot的redis-starter自动配置类RedisAutoConfiguration来说,如果我们没有配置redisTemplate命名的RedisTemplate对象或者stringRedisTemplate类型的对象,那么也都会自动注入一个自动配置的对象。

当然自动配置的redisTemplate并没有设置序列化器,所以我们可以有两种方式来更改,一种是将默认的引入到参数列表中然后直接修改,实际的对象只在堆中存在一个,也可以起redisTemplate名称的函数直接注册,这样就自动配置类的注册就不会生效了。我下面是第一种实现方式:

代码语言:javascript复制
  @Bean
    public RedisTemplate<Object, Object> redisStringTemplate(
            RedisTemplate<Object, Object> redisTemplate, ServerProperties properties){
        Jackson2JsonRedisSerializer<?> jksSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        StringRedisSerializer strSerializer = new StringRedisSerializer();

        redisTemplate.setKeySerializer(jksSerializer);
        redisTemplate.setValueSerializer(jksSerializer);

        redisTemplate.setHashKeySerializer(strSerializer);
        redisTemplate.setHashValueSerializer(jksSerializer);

        initData(redisTemplate, properties);
        return redisTemplate;
    }

可以看到我这里对hash的值使用了Jackson2JsonRedisSerializer,并且传入的序列化类型是Object.class,也就是最通用的。这也就造成了一个问题,那就是如果value是带泛型的数据,那么最后无法解析到精确到泛型特定类型的数据,例如List<XXX>,最终只能解析成ArrayList<LinkedHashMap>类型的数据,LinkedHashMap用来组装具体的数据kv(属性名、属性值),而非我们特定的XXX类型。具体的原理可以参考这两篇文章:

https://blog.csdn.net/qq_38074398/article/details/128233005 (从结论上解析)【1】 https://www.cnblogs.com/kzyuan/p/16312931.html (从底层设计细节解析)【2】 GenericJackson2JsonRedisSerializer根据额外插入的类全限定名通过反射可以正确得到实体类的实例。 而Jackson2JsonRedisSerializer由于没有插入额外的信息,那么只能通过不同的数据结构来组装反序列化后的内容。对于List的数据会反序列化成ArrayList<LinkedHashMap>,使用LinkedHashMap来组装实体类对象的字段与字段值。

泛型的解析:

那么如何讲ArrayList<LinkedHashMap>转为特定的类型呢?一种解决方式是上方【2】链接文章中提到的,使用ObjectMapper进行类型转化。

代码语言:javascript复制
    @Test
    public void testRange() {
        String key = "right_push_all_01";
        List<LinkedHashMap<String, Object>> linkedHashMapList = redisService.lRange(key, 0, -1);
        ObjectMapper objectMapper = new ObjectMapper();
        List<ThisIsDTO> thisIsDTOList = objectMapper.convertValue(linkedHashMapList, new TypeReference<List<ThisIsDTO>>() { });
        for (ThisIsDTO thisIsDTO : thisIsDTOList) {
            System.out.println(thisIsDTO.getAge());
        }
    }

当然我这里采用另外一种取巧的方式,就是先转为json字符串,再使用gson进行类型修正,当然也可以不返回,直接raw = new Gson...也可以。

代码语言:javascript复制
    private List<DetectedPort> transfer(List<DetectedPort> raw){
        String tmp = JSONObject.valueToString(raw);
        Type type = new TypeToken<List<DetectedPort>>() { }.getType();
        return new Gson().fromJson(tmp, type);
    }

这样就可以正确的当作特定类型的List来操作了。

0 人点赞