聊聊jetcache的CacheMonitor

2024-06-21 12:21:02 浏览数 (1)

本文主要研究一下jetcache的CacheMonitor

CacheMonitor

jetcache-core/src/main/java/com/alicp/jetcache/CacheMonitor.java

代码语言:javascript复制
@FunctionalInterface
public interface CacheMonitor {

    void afterOperation(CacheEvent event);

}

CacheMonitor是一个FunctionalInterface,它定义了afterOperation方法用于消费CacheEvent,它有两个实现类,分别是DefaultCacheMonitor、CacheNotifyMonitor

CacheEvent

jetcache-core/src/main/java/com/alicp/jetcache/event/CacheEvent.java

代码语言:javascript复制
public class CacheEvent {

    protected Cache cache;

    public CacheEvent(Cache cache) {
        this.cache = cache;
    }

    public Cache getCache() {
        return cache;
    }

}

CacheEvent定义了cache属性,它有CacheGetEvent、CacheGetAllEvent、CacheLoadEvent、CacheLoadAllEvent、CachePutEvent、CachePutAllEvent、CacheRemoveEvent、CacheRemoveAllEvent这几个子类

DefaultCacheMonitor

jetcache-core/src/main/java/com/alicp/jetcache/support/DefaultCacheMonitor.java

代码语言:javascript复制
public class DefaultCacheMonitor implements CacheMonitor {

    private static final Logger logger = LoggerFactory.getLogger(DefaultCacheMonitor.class);

    protected CacheStat cacheStat;
    private String cacheName;

    public DefaultCacheMonitor(String cacheName) {
        if (cacheName == null) {
            throw new NullPointerException();
        }
        this.cacheName = cacheName;
        resetStat();
    }

    public String getCacheName() {
        return cacheName;
    }

    public synchronized void resetStat() {
        cacheStat = new CacheStat();
        cacheStat.setStatStartTime(System.currentTimeMillis());
        cacheStat.setCacheName(cacheName);
    }

    public synchronized CacheStat getCacheStat() {
        CacheStat stat = cacheStat.clone();
        stat.setStatEndTime(System.currentTimeMillis());
        return stat;
    }

    @Override
    public synchronized void afterOperation(CacheEvent event) {
        if (event instanceof CacheGetEvent) {
            CacheGetEvent e = (CacheGetEvent) event;
            afterGet(e.getMillis(), e.getKey(), e.getResult());
        } else if (event instanceof CachePutEvent) {
            CachePutEvent e = (CachePutEvent) event;
            afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
        } else if (event instanceof CacheRemoveEvent) {
            CacheRemoveEvent e = (CacheRemoveEvent) event;
            afterRemove(e.getMillis(), e.getKey(), e.getResult());
        } else if (event instanceof CacheLoadEvent) {
            CacheLoadEvent e = (CacheLoadEvent) event;
            afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
        } else if (event instanceof CacheGetAllEvent) {
            CacheGetAllEvent e = (CacheGetAllEvent) event;
            afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
        } else if (event instanceof CacheLoadAllEvent) {
            CacheLoadAllEvent e = (CacheLoadAllEvent) event;
            afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
        } else if (event instanceof CachePutAllEvent) {
            CachePutAllEvent e = (CachePutAllEvent) event;
            afterPutAll(e.getMillis(), e.getMap(), e.getResult());
        } else if (event instanceof CacheRemoveAllEvent) {
            CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
            afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
        }
    }

    private void afterGet(long millis, Object key, CacheGetResult result) {
        cacheStat.minGetTime = Math.min(cacheStat.minGetTime, millis);
        cacheStat.maxGetTime = Math.max(cacheStat.maxGetTime, millis);
        cacheStat.getTimeSum  = millis;
        cacheStat.getCount  ;
        parseSingleGet(result);
    }

    private void parseSingleGet(CacheGetResult result) {
        switch (result.getResultCode()) {
            case SUCCESS:
                cacheStat.getHitCount  ;
                break;
            case NOT_EXISTS:
                cacheStat.getMissCount  ;
                break;
            case EXPIRED:
                cacheStat.getExpireCount  ;
                break;
            case FAIL:
                cacheStat.getFailCount  ;
                break;
            default:
                logger.warn("jetcache get return unexpected code: "   result.getResultCode());
        }
    }

    private void afterPut(long millis, Object key, Object value, CacheResult result) {
        cacheStat.minPutTime = Math.min(cacheStat.minPutTime, millis);
        cacheStat.maxPutTime = Math.max(cacheStat.maxPutTime, millis);
        cacheStat.putTimeSum  = millis;
        cacheStat.putCount  ;
        switch (result.getResultCode()) {
            case SUCCESS:
                cacheStat.putSuccessCount  ;
                break;
            case FAIL:
            case PART_SUCCESS:
                cacheStat.putFailCount  ;
                break;
            case EXISTS:
                break;
            default:
                logger.warn("jetcache PUT return unexpected code: "   result.getResultCode());
        }
    }

    private void afterRemove(long millis, Object key, CacheResult result) {
        cacheStat.minRemoveTime = Math.min(cacheStat.minRemoveTime, millis);
        cacheStat.maxRemoveTime = Math.max(cacheStat.maxRemoveTime, millis);
        cacheStat.removeTimeSum  = millis;
        cacheStat.removeCount  ;
        switch (result.getResultCode()) {
            case SUCCESS:
            case NOT_EXISTS:
                cacheStat.removeSuccessCount  ;
                break;
            case FAIL:
            case PART_SUCCESS:
                cacheStat.removeFailCount  ;
                break;
            default:
                logger.warn("jetcache REMOVE return unexpected code: "   result.getResultCode());
        }
    }

    private void afterLoad(long millis, Object key, Object loadedValue, boolean success) {
        cacheStat.minLoadTime = Math.min(cacheStat.minLoadTime, millis);
        cacheStat.maxLoadTime = Math.max(cacheStat.maxLoadTime, millis);
        cacheStat.loadTimeSum  = millis;
        cacheStat.loadCount  ;
        if (success) {
            cacheStat.loadSuccessCount  ;
        } else {
            cacheStat.loadFailCount  ;
        }
    }

    private void afterLoadAll(long millis, Set keys, Map loadedValue, boolean success) {
        if (keys == null) {
            return;
        }
        int count = keys.size();
        cacheStat.minLoadTime = Math.min(cacheStat.minLoadTime, millis);
        cacheStat.maxLoadTime = Math.max(cacheStat.maxLoadTime, millis);
        cacheStat.loadTimeSum  = millis;
        cacheStat.loadCount  = count;
        if (success) {
            cacheStat.loadSuccessCount  = count;
        } else {
            cacheStat.loadFailCount  = count;
        }
    }

    private void afterGetAll(long millis, Set keys, MultiGetResult result) {
        if (keys == null) {
            return;
        }
        int keyCount = keys.size();
        cacheStat.minGetTime = Math.min(cacheStat.minGetTime, millis);
        cacheStat.maxGetTime = Math.max(cacheStat.maxGetTime, millis);
        cacheStat.getTimeSum  = millis;
        cacheStat.getCount  = keyCount;
        Map resultValues = result.getValues();
        if (resultValues == null) {
            cacheStat.getFailCount  = keyCount;
        } else {
            for (Object singleResult : resultValues.values()) {
                CacheGetResult r = ((CacheGetResult) singleResult);
                parseSingleGet(r);
            }
        }
    }

    private void afterRemoveAll(long millis, Set keys, CacheResult result) {
        if (keys == null) {
            return;
        }
        int keyCount = keys.size();
        cacheStat.minRemoveTime = Math.min(cacheStat.minRemoveTime, millis);
        cacheStat.maxRemoveTime = Math.max(cacheStat.maxRemoveTime, millis);
        cacheStat.removeTimeSum  = millis;
        cacheStat.removeCount  = keyCount;
        if (result.isSuccess()) {
            cacheStat.removeSuccessCount  = keyCount;
        } else {
            cacheStat.removeFailCount  = keyCount;
        }
    }

    private void afterPutAll(long millis, Map map, CacheResult result) {
        if (map == null) {
            return;
        }
        int keyCount = map.size();
        cacheStat.minPutTime = Math.min(cacheStat.minPutTime, millis);
        cacheStat.maxPutTime = Math.max(cacheStat.maxPutTime, millis);
        cacheStat.putTimeSum  = millis;
        cacheStat.putCount  = keyCount;
        if (result.isSuccess()) {
            cacheStat.putSuccessCount  = keyCount;
        } else {
            cacheStat.putFailCount  = keyCount;
        }
    }

}

DefaultCacheMonitor实现了CacheMonitor接口,其afterOperation方法根据CacheEvent的具体类型来执行不同的处理逻辑,CacheGetEvent执行afterGet,CachePutEvent执行afterPut,CacheRemoveEvent执行afterRemove,CacheLoadEvent执行afterLoad,CacheGetAllEvent执行afterGetAll,CacheLoadAllEvent执行afterLoadAll,CachePutAllEvent执行afterPutAll,CacheRemoveAllEvent执行afterRemoveAll;这些方法主要是维护CacheStat的相关属性

CacheStat

jetcache-core/src/main/java/com/alicp/jetcache/support/CacheStat.java

代码语言:javascript复制
public class CacheStat implements Serializable, Cloneable {

    private static final long serialVersionUID = -8802969946750554026L;

    protected String cacheName;
    protected long statStartTime;
    protected long statEndTime;

    protected long getCount;
    protected long getHitCount;
    protected long getMissCount;
    protected long getFailCount;
    protected long getExpireCount;
    protected long getTimeSum;
    protected long minGetTime = Long.MAX_VALUE;
    protected long maxGetTime = 0;

    protected long putCount;
    protected long putSuccessCount;
    protected long putFailCount;
    protected long putTimeSum;
    protected long minPutTime = Long.MAX_VALUE;
    protected long maxPutTime = 0;

    protected long removeCount;
    protected long removeSuccessCount;
    protected long removeFailCount;
    protected long removeTimeSum;
    protected long minRemoveTime = Long.MAX_VALUE;
    protected long maxRemoveTime = 0;

    protected long loadCount;
    protected long loadSuccessCount;
    protected long loadFailCount;
    protected long loadTimeSum;
    protected long minLoadTime = Long.MAX_VALUE;
    protected long maxLoadTime = 0;

    //......
}    

CacheStat定义了get、put、remove、load这几类动作的相关指标

CacheNotifyMonitor

jetcache-core/src/main/java/com/alicp/jetcache/support/CacheNotifyMonitor.java

代码语言:javascript复制
public class CacheNotifyMonitor implements CacheMonitor {
    private final BroadcastManager broadcastManager;
    private final String area;
    private final String cacheName;
    private final String sourceId;

    public CacheNotifyMonitor(CacheManager cacheManager, String area, String cacheName) {
        this.broadcastManager = cacheManager.getBroadcastManager(area);
        this.area = area;
        this.cacheName = cacheName;
        if (broadcastManager != null) {
            this.sourceId = broadcastManager.getSourceId();
        } else {
            this.sourceId = null;
        }
    }

    public CacheNotifyMonitor(CacheManager cacheManager, String cacheName) {
        this(cacheManager, CacheConsts.DEFAULT_AREA, cacheName);
    }

    private Object convertKey(Object key, AbstractEmbeddedCache localCache) {
        Function keyConvertor = localCache.config().getKeyConvertor();
        if (keyConvertor == null) {
            return key;
        } else {
            return keyConvertor.apply(key);
        }
    }

    private AbstractEmbeddedCache getLocalCache(AbstractCache absCache) {
        if (!(absCache instanceof MultiLevelCache)) {
            return null;
        }
        for (Cache c : ((MultiLevelCache) absCache).caches()) {
            if (c instanceof AbstractEmbeddedCache) {
                return (AbstractEmbeddedCache) c;
            }
        }
        return null;
    }

    @Override
    public void afterOperation(CacheEvent event) {
        if (this.broadcastManager == null) {
            return;
        }
        AbstractCache absCache = CacheUtil.getAbstractCache(event.getCache());
        if (absCache.isClosed()) {
            return;
        }
        AbstractEmbeddedCache localCache = getLocalCache(absCache);
        if (localCache == null) {
            return;
        }
        if (event instanceof CachePutEvent) {
            CacheMessage m = new CacheMessage();
            m.setArea(area);
            m.setCacheName(cacheName);
            m.setSourceId(sourceId);
            CachePutEvent e = (CachePutEvent) event;
            m.setType(CacheMessage.TYPE_PUT);
            m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});
            broadcastManager.publish(m);
        } else if (event instanceof CacheRemoveEvent) {
            CacheMessage m = new CacheMessage();
            m.setArea(area);
            m.setCacheName(cacheName);
            m.setSourceId(sourceId);
            CacheRemoveEvent e = (CacheRemoveEvent) event;
            m.setType(CacheMessage.TYPE_REMOVE);
            m.setKeys(new Object[]{convertKey(e.getKey(), localCache)});
            broadcastManager.publish(m);
        } else if (event instanceof CachePutAllEvent) {
            CacheMessage m = new CacheMessage();
            m.setArea(area);
            m.setCacheName(cacheName);
            m.setSourceId(sourceId);
            CachePutAllEvent e = (CachePutAllEvent) event;
            m.setType(CacheMessage.TYPE_PUT_ALL);
            if (e.getMap() != null) {
                m.setKeys(e.getMap().keySet().stream().map(k -> convertKey(k, localCache)).toArray());
            }
            broadcastManager.publish(m);
        } else if (event instanceof CacheRemoveAllEvent) {
            CacheMessage m = new CacheMessage();
            m.setArea(area);
            m.setCacheName(cacheName);
            m.setSourceId(sourceId);
            CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
            m.setType(CacheMessage.TYPE_REMOVE_ALL);
            if (e.getKeys() != null) {
                m.setKeys(e.getKeys().stream().map(k -> convertKey(k, localCache)).toArray());
            }
            broadcastManager.publish(m);
        }
    }
}

CacheNotifyMonitor实现了CacheMonitor接口,其构造器通过cacheManager获取broadcastManager,其afterOperation对于broadcastManager为null或者cache是close的或者localCache为null不做处理,否则根据不同的event具体类型来构建不同的type的CacheMessage,最后通过broadcastManager.publish(m)去发布消息

BroadcastManager

jetcache-core/src/main/java/com/alicp/jetcache/support/BroadcastManager.java

代码语言:javascript复制
public abstract class BroadcastManager implements AutoCloseable {
    private static Logger logger = LoggerFactory.getLogger(BroadcastManager.class);

    private final String sourceId = UUID.randomUUID().toString();
    private final CacheManager cacheManager;

    public BroadcastManager(CacheManager cacheManager) {
        this.cacheManager = cacheManager;
    }

    protected void checkConfig(ExternalCacheConfig config) {
        if (config.getBroadcastChannel() == null) {
            throw new CacheConfigException("BroadcastChannel not set");
        }
        if (config.getValueEncoder() == null) {
            throw new CacheConfigException("no value encoder");
        }
        if (config.getValueDecoder() == null) {
            throw new CacheConfigException("no value decoder");
        }
    }

    public abstract CacheResult publish(CacheMessage cacheMessage);

    public abstract void startSubscribe();

    @Override
    public void close() throws Exception {
    }

    //......
}    

BroadcastManager是个抽象类,实现了AutoCloseable接口,其close方法默认为空实现,它定义了publish及startSubscribe两个抽象方法;它主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现

CacheMonitorInstaller

jetcache-core/src/main/java/com/alicp/jetcache/template/CacheMonitorInstaller.java

代码语言:javascript复制
public interface CacheMonitorInstaller {
    void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig);
}

CacheMonitorInstaller接口定义了addMonitors方法,它有MetricsMonitorInstaller、NotifyMonitorInstaller两个实现类

MetricsMonitorInstaller

jetcache-core/src/main/java/com/alicp/jetcache/template/MetricsMonitorInstaller.java

代码语言:javascript复制
public class MetricsMonitorInstaller extends AbstractLifecycle implements CacheMonitorInstaller {

    private final Consumer<StatInfo> metricsCallback;
    private final Duration interval;

    private DefaultMetricsManager metricsManager;

    public MetricsMonitorInstaller(Consumer<StatInfo> metricsCallback, Duration interval) {
        this.metricsCallback = metricsCallback;
        this.interval = interval;
    }

    @Override
    protected void doInit() {
        if (metricsCallback != null && interval != null) {
            metricsManager = new DefaultMetricsManager((int) interval.toMinutes(),
                    TimeUnit.MINUTES, metricsCallback);
            metricsManager.start();
        }
    }

    @Override
    protected void doShutdown() {
        if (metricsManager != null) {
            metricsManager.stop();
            metricsManager.clear();
            metricsManager = null;
        }
    }

    @Override
    public void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig) {
        if (metricsManager == null) {
            return;
        }
        cache = CacheUtil.getAbstractCache(cache);
        if (cache instanceof MultiLevelCache) {
            MultiLevelCache mc = (MultiLevelCache) cache;
            if (mc.caches().length == 2) {
                Cache local = mc.caches()[0];
                Cache remote = mc.caches()[1];
                DefaultCacheMonitor localMonitor = new DefaultCacheMonitor(quickConfig.getName()   "_local");
                local.config().getMonitors().add(localMonitor);
                DefaultCacheMonitor remoteMonitor = new DefaultCacheMonitor(quickConfig.getName()   "_remote");
                remote.config().getMonitors().add(remoteMonitor);
                metricsManager.add(localMonitor, remoteMonitor);
            }
        }

        DefaultCacheMonitor monitor = new DefaultCacheMonitor(quickConfig.getName());
        cache.config().getMonitors().add(monitor);
        metricsManager.add(monitor);
    }
}

MetricsMonitorInstaller实现了CacheMonitorInstaller接口,其addMonitors方法会创建DefaultCacheMonitor并添加到cache的config中,对于MultiLevelCache会分别创建localMonitor、remoteMonitor并添加到对应cache的config中

NotifyMonitorInstaller

jetcache-core/src/main/java/com/alicp/jetcache/template/NotifyMonitorInstaller.java

代码语言:javascript复制
public class NotifyMonitorInstaller implements CacheMonitorInstaller {

    private final Function<String, CacheBuilder> remoteBuilderTemplate;

    public NotifyMonitorInstaller(Function<String, CacheBuilder> remoteBuilderTemplate) {
        this.remoteBuilderTemplate = remoteBuilderTemplate;
    }

    @Override
    public void addMonitors(CacheManager cacheManager, Cache cache, QuickConfig quickConfig) {
        if (quickConfig.getSyncLocal() == null || !quickConfig.getSyncLocal()) {
            return;
        }
        if (!(CacheUtil.getAbstractCache(cache) instanceof MultiLevelCache)) {
            return;
        }
        String area = quickConfig.getArea();
        final ExternalCacheBuilder cacheBuilder = (ExternalCacheBuilder) remoteBuilderTemplate.apply(area);
        if (cacheBuilder == null || !cacheBuilder.supportBroadcast()
                || cacheBuilder.getConfig().getBroadcastChannel() == null) {
            return;
        }

        if (cacheManager.getBroadcastManager(area) == null) {
            BroadcastManager cm = cacheBuilder.createBroadcastManager(cacheManager);
            if (cm != null) {
                cm.startSubscribe();
                cacheManager.putBroadcastManager(area, cm);
            }
        }

        CacheMonitor monitor = new CacheNotifyMonitor(cacheManager, area, quickConfig.getName());
        cache.config().getMonitors().add(monitor);
    }
}

NotifyMonitorInstaller实现了CacheMonitorInstaller接口,其addMonitors方法对于BroadcastManager为null的不做处理,否则执行其startSubscribe方法,然后添加到cacheManager,最后创建CacheNotifyMonitor,添加到cache的config中

ConfigProvider

代码语言:javascript复制
public class ConfigProvider extends AbstractLifecycle {

    private static final Logger logger = LoggerFactory.getLogger(ConfigProvider.class);

    @Resource
    protected GlobalCacheConfig globalCacheConfig;

    protected EncoderParser encoderParser;
    protected KeyConvertorParser keyConvertorParser;
    private Consumer<StatInfo> metricsCallback;

    private CacheBuilderTemplate cacheBuilderTemplate;

    public ConfigProvider() {
        encoderParser = new DefaultEncoderParser();
        keyConvertorParser = new DefaultKeyConvertorParser();
        metricsCallback = new StatInfoLogger(false);
    }

    @Override
    protected void doInit() {
        cacheBuilderTemplate = new CacheBuilderTemplate(globalCacheConfig.isPenetrationProtect(),
                globalCacheConfig.getLocalCacheBuilders(), globalCacheConfig.getRemoteCacheBuilders());
        for (CacheBuilder builder : globalCacheConfig.getLocalCacheBuilders().values()) {
            EmbeddedCacheBuilder eb = (EmbeddedCacheBuilder) builder;
            if (eb.getConfig().getKeyConvertor() instanceof ParserFunction) {
                ParserFunction f = (ParserFunction) eb.getConfig().getKeyConvertor();
                eb.setKeyConvertor(parseKeyConvertor(f.getValue()));
            }
        }
        for (CacheBuilder builder : globalCacheConfig.getRemoteCacheBuilders().values()) {
            ExternalCacheBuilder eb = (ExternalCacheBuilder) builder;
            if (eb.getConfig().getKeyConvertor() instanceof ParserFunction) {
                ParserFunction f = (ParserFunction) eb.getConfig().getKeyConvertor();
                eb.setKeyConvertor(parseKeyConvertor(f.getValue()));
            }
            if (eb.getConfig().getValueEncoder() instanceof ParserFunction) {
                ParserFunction f = (ParserFunction) eb.getConfig().getValueEncoder();
                eb.setValueEncoder(parseValueEncoder(f.getValue()));
            }
            if (eb.getConfig().getValueDecoder() instanceof ParserFunction) {
                ParserFunction f = (ParserFunction) eb.getConfig().getValueDecoder();
                eb.setValueDecoder(parseValueDecoder(f.getValue()));
            }
        }
        initCacheMonitorInstallers();
    }

    //......
}    

ConfigProvider继承了AbstractLifecycle,它覆盖了doInit方法,该方法先创建cacheBuilderTemplate,之后根据globalCacheConfig.getLocalCacheBuilders()去设置setKeyConvertor,根据globalCacheConfig.getRemoteCacheBuilders()设置setKeyConvertor、setValueEncoder、setValueDecoder,最后执行initCacheMonitorInstallers方法

initCacheMonitorInstallers

代码语言:javascript复制
    protected void initCacheMonitorInstallers() {
        cacheBuilderTemplate.getCacheMonitorInstallers().add(metricsMonitorInstaller());
        cacheBuilderTemplate.getCacheMonitorInstallers().add(notifyMonitorInstaller());
        for (CacheMonitorInstaller i : cacheBuilderTemplate.getCacheMonitorInstallers()) {
            if (i instanceof AbstractLifecycle) {
                ((AbstractLifecycle) i).init();
            }
        }
    }

    protected CacheMonitorInstaller metricsMonitorInstaller() {
        Duration interval = null;
        if (globalCacheConfig.getStatIntervalMinutes() > 0) {
            interval = Duration.ofMinutes(globalCacheConfig.getStatIntervalMinutes());
        }

        MetricsMonitorInstaller i = new MetricsMonitorInstaller(metricsCallback, interval);
        i.init();
        return i;
    }

    protected CacheMonitorInstaller notifyMonitorInstaller() {
        return new NotifyMonitorInstaller(area -> globalCacheConfig.getRemoteCacheBuilders().get(area));
    }        

initCacheMonitorInstallers会执行metricsMonitorInstaller、notifyMonitorInstaller并添加到cacheBuilderTemplate.getCacheMonitorInstallers(),最后遍历cacheBuilderTemplate.getCacheMonitorInstallers()对于是AbstractLifecycle类型的挨个执行init方法 metricsMonitorInstaller方法创建MetricsMonitorInstaller并执行其init方法;notifyMonitorInstaller方法创建NotifyMonitorInstaller

小结

  • CacheMonitor是一个FunctionalInterface,它定义了afterOperation方法用于消费CacheEvent,它有两个实现类,分别是DefaultCacheMonitor、CacheNotifyMonitor

DefaultCacheMonitor实现了CacheMonitor接口,其afterOperation方法根据CacheEvent的具体类型来执行不同的处理逻辑,主要是维护CacheStat的相关属性 CacheNotifyMonitor实现了CacheMonitor接口,根据不同的event具体类型来构建不同的type的CacheMessage,最后通过broadcastManager.publish(m)去发布消息 BroadcastManager是个抽象类,主要有MockRemoteCacheBuilder的匿名实现、RedisBroadcastManager、LettuceBroadcastManager、SpringDataBroadcastManager、RedissonBroadcastManager这几个实现

  • CacheMonitorInstaller接口定义了addMonitors方法,它有MetricsMonitorInstaller、NotifyMonitorInstaller两个实现类
  • ConfigProvider的doInit方法,会执行initCacheMonitorInstallers方法,主要是执行metricsMonitorInstaller、notifyMonitorInstaller并添加到cacheBuilderTemplate.getCacheMonitorInstallers()中

0 人点赞