聊聊jetcache的ProxyCache

2024-06-19 09:26:15 浏览数 (1)

本文主要研究一下jetcache的ProxyCache

ProxyCache

jetcache-core/src/main/java/com/alicp/jetcache/ProxyCache.java

代码语言:javascript复制
public interface ProxyCache<K, V> extends Cache<K, V> {
    Cache<K, V> getTargetCache();

    @Override
    default <T> T unwrap(Class<T> clazz) {
        return getTargetCache().unwrap(clazz);
    }

}

ProxyCache继承了Cache接口,它定义了getTargetCache方法,并默认实现了unwrap方法

SimpleProxyCache

jetcache-core/src/main/java/com/alicp/jetcache/SimpleProxyCache.java

代码语言:javascript复制
public class SimpleProxyCache<K, V> implements ProxyCache<K, V> {

    protected Cache<K, V> cache;

    public SimpleProxyCache(Cache<K, V> cache) {
        this.cache = cache;
    }

    @Override
    public CacheConfig<K, V> config() {
        return cache.config();
    }

    @Override
    public Cache<K, V> getTargetCache() {
        return cache;
    }

    @Override
    public V get(K key) {
        return cache.get(key);
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) {
        return cache.getAll(keys);
    }

    @Override
    public void put(K key, V value) {
        cache.put(key, value);
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> map) {
        cache.putAll(map);
    }

    @Override
    public boolean putIfAbsent(K key, V value) {
        return cache.putIfAbsent(key, value);
    }

    @Override
    public boolean remove(K key) {
        return cache.remove(key);
    }

    @Override
    public void removeAll(Set<? extends K> keys) {
        cache.removeAll(keys);
    }

    @Override
    public <T> T unwrap(Class<T> clazz) {
        return cache.unwrap(clazz);
    }

    @Override
    public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
        return cache.tryLock(key, expire, timeUnit);
    }

    @Override
    public boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action) {
        return cache.tryLockAndRun(key, expire, timeUnit, action);
    }

    @Override
    public CacheGetResult<V> GET(K key) {
        return cache.GET(key);
    }

    @Override
    public MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
        return cache.GET_ALL(keys);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader) {
        return cache.computeIfAbsent(key, loader);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
        return cache.computeIfAbsent(key, loader, cacheNullWhenLoaderReturnNull);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.computeIfAbsent(key, loader, cacheNullWhenLoaderReturnNull, expireAfterWrite, timeUnit);
    }

    @Override
    public void put(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        cache.put(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult PUT(K key, V value) {
        return cache.PUT(key, value);
    }

    @Override
    public CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
        cache.putAll(map, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
        return cache.PUT_ALL(map);
    }

    @Override
    public CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT_ALL(map, expireAfterWrite, timeUnit);
    }

    @Override
    public CacheResult REMOVE(K key) {
        return cache.REMOVE(key);
    }

    @Override
    public CacheResult REMOVE_ALL(Set<? extends K> keys) {
        return cache.REMOVE_ALL(keys);
    }

    @Override
    public CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
        return cache.PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
    }

    @Override
    public void close() {
        cache.close();
    }
}

SimpleProxyCache实现了ProxyCache接口,它定义了cache属性,其方法均委托给了cache对象

LoadingCache

jetcache-core/src/main/java/com/alicp/jetcache/LoadingCache.java

代码语言:javascript复制
public class LoadingCache<K, V> extends SimpleProxyCache<K, V> {

    protected Consumer<CacheEvent> eventConsumer;

    protected CacheConfig<K, V> config;

    public LoadingCache(Cache<K, V> cache) {
        super(cache);
        this.config = config();
        eventConsumer = CacheUtil.getAbstractCache(cache)::notify;
    }

    @Override
    public V get(K key) throws CacheInvokeException {
        CacheLoader<K, V> loader = config.getLoader();
        if (loader != null) {
            return AbstractCache.computeIfAbsentImpl(key, loader,
                    config.isCacheNullValue() ,0, null, this);
        } else {
            return cache.get(key);
        }
    }

    protected boolean needUpdate(V loadedValue, CacheLoader<K, V> loader) {
        if (loadedValue == null && !config.isCacheNullValue()) {
            return false;
        }
        if (loader.vetoCacheUpdate()) {
            return false;
        }
        return true;
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
        CacheLoader<K, V> loader = config.getLoader();
        if (loader != null) {
            MultiGetResult<K, V> r = GET_ALL(keys);
            Map<K, V> kvMap;
            if (r.isSuccess() || r.getResultCode() == CacheResultCode.PART_SUCCESS) {
                kvMap = r.unwrapValues();
            } else {
                kvMap = new HashMap<>();
            }
            Set<K> keysNeedLoad = new LinkedHashSet<>();
            keys.forEach((k) -> {
                if (!kvMap.containsKey(k)) {
                    keysNeedLoad.add(k);
                }
            });
            if (!config.isCachePenetrationProtect()) {
                if (eventConsumer != null) {
                    loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
                }
                Map<K, V> loadResult;
                try {
                    loadResult = loader.loadAll(keysNeedLoad);

                    CacheLoader<K, V> theLoader = loader;
                    Map<K, V> updateValues = new HashMap<>();
                    loadResult.forEach((k,v)->{
                        if (needUpdate(v, theLoader)){
                            updateValues.put(k, v);
                        }
                    });
                    // batch put
                    if (!updateValues.isEmpty()) {
                        PUT_ALL(updateValues);
                    }
                } catch (Throwable e) {
                    throw new CacheInvokeException(e);
                }
                kvMap.putAll(loadResult);
            } else {
                AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
                loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
                for(K key : keysNeedLoad) {
                    Consumer<V> cacheUpdater = (v) -> {
                        if(needUpdate(v, config.getLoader())) {
                            PUT(key, v);
                        }
                    };
                    V v = AbstractCache.synchronizedLoad(config, abstractCache, key, loader, cacheUpdater);
                    kvMap.put(key, v);
                }
            }
            return kvMap;
        } else {
            return cache.getAll(keys);
        }

    }
}

LoadingCache继承了SimpleProxyCache,其get方法会判断config.getLoader()是否为null,不为null则执行AbstractCache.computeIfAbsentImpl(key, loader,config.isCacheNullValue() ,0, null, this);getAll方法多了loader以及config.isCachePenetrationProtect()的处理逻辑,对于开启cachePenetrationProtect的会通过CacheUtil.createProxyLoader包装一下loader,然后AbstractCache.synchronizedLoad去加载value

createProxyLoader

jetcache-core/src/main/java/com/alicp/jetcache/CacheUtil.java

代码语言:javascript复制
public class CacheUtil {

    private interface ProxyLoader<K, V> extends CacheLoader<K, V> {
    }

    public static <K, V> ProxyLoader<K, V> createProxyLoader(Cache<K, V> cache,
                                                             CacheLoader<K, V> loader,
                                                             Consumer<CacheEvent> eventConsumer) {
        if (loader instanceof ProxyLoader) {
            return (ProxyLoader<K, V>) loader;
        }
        return new ProxyLoader<K, V>() {
            @Override
            public V load(K key) throws Throwable {
                long t = System.currentTimeMillis();
                V v = null;
                boolean success = false;
                try {
                    v = loader.load(key);
                    success = true;
                } finally {
                    t = System.currentTimeMillis() - t;
                    CacheLoadEvent event = new CacheLoadEvent(cache, t, key, v, success);
                    eventConsumer.accept(event);
                }
                return v;
            }

            @Override
            public Map<K, V> loadAll(Set<K> keys) throws Throwable {
                long t = System.currentTimeMillis();
                boolean success = false;
                Map<K, V> kvMap = null;
                try {
                    kvMap = loader.loadAll(keys);
                    success = true;
                } finally {
                    t = System.currentTimeMillis() - t;
                    CacheLoadAllEvent event = new CacheLoadAllEvent(cache, t, keys, kvMap, success);
                    eventConsumer.accept(event);
                }
                return kvMap;
            }

            @Override
            public boolean vetoCacheUpdate() {
                return loader.vetoCacheUpdate();
            }
        };
    }

    public static <K, V> ProxyLoader<K, V> createProxyLoader(Cache<K, V> cache,
                                                          Function<K, V> loader,
                                                          Consumer<CacheEvent> eventConsumer) {
        if (loader instanceof ProxyLoader) {
            return (ProxyLoader<K, V>) loader;
        }
        if (loader instanceof CacheLoader) {
            return createProxyLoader(cache, (CacheLoader) loader, eventConsumer);
        }
        return k -> {
            long t = System.currentTimeMillis();
            V v = null;
            boolean success = false;
            try {
                v = loader.apply(k);
                success = true;
            } finally {
                t = System.currentTimeMillis() - t;
                CacheLoadEvent event = new CacheLoadEvent(cache, t, k, v, success);
                eventConsumer.accept(event);
            }
            return v;
        };
    }


    public static <K, V> AbstractCache<K, V> getAbstractCache(Cache<K, V> c) {
        while (c instanceof ProxyCache) {
            c = ((ProxyCache) c).getTargetCache();
        }
        return (AbstractCache) c;
    }

}

CacheUtil提供了两个createProxyLoader,一个loader类型为CacheLoader,一个为Function;其load及loadAll方法委托给了loader,主要是增加了耗时统计并创建CacheLoadEvent,交给eventConsumer.accept(event)去消费

synchronizedLoad

jetcache-core/src/main/java/com/alicp/jetcache/AbstractCache.java

代码语言:javascript复制
    static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
                                     K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
        ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
        Object lockKey = buildLoaderLockKey(abstractCache, key);
        while (true) {
            boolean create[] = new boolean[1];
            LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
                create[0] = true;
                LoaderLock loaderLock = new LoaderLock();
                loaderLock.signal = new CountDownLatch(1);
                loaderLock.loaderThread = Thread.currentThread();
                return loaderLock;
            });
            if (create[0] || ll.loaderThread == Thread.currentThread()) {
                try {
                    CacheGetResult<V> getResult = abstractCache.GET(key);
                    if (getResult.isSuccess()) {
                        ll.success = true;
                        ll.value = getResult.getValue();
                        return getResult.getValue();
                    } else {
                        V loadedValue = newLoader.apply(key);
                        ll.success = true;
                        ll.value = loadedValue;
                        cacheUpdater.accept(loadedValue);
                        return loadedValue;
                    }
                } finally {
                    if (create[0]) {
                        ll.signal.countDown();
                        loaderMap.remove(lockKey);
                    }
                }
            } else {
                try {
                    Duration timeout = config.getPenetrationProtectTimeout();
                    if (timeout == null) {
                        ll.signal.await();
                    } else {
                        boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
                        if(!ok) {
                            logger.info("loader wait timeout:"   timeout);
                            return newLoader.apply(key);
                        }
                    }
                } catch (InterruptedException e) {
                    logger.warn("loader wait interrupted");
                    return newLoader.apply(key);
                }
                if (ll.success) {
                    return (V) ll.value;
                } else {
                    continue;
                }

            }
        }
    }

AbstractCache提供了synchronizedLoad静态方法,它先通过buildLoaderLockKey来创建lockKey,之后while循环,computeIfAbsent指定lockKey的LoaderLock,对于loaderThread为当前线程的先通过GET获取result,若为success则返回,否则通过loader加载,然后通知cacheUpdater,最后执行ll.signal.countDown(),对于非当前thread的则根据指定的timeout执行ll.signal.await(),若await超时或者InterruptedException则执行newLoader.apply,否则对于success的直接返回value,非success则继续循环处理

RefreshCache

jetcache-core/src/main/java/com/alicp/jetcache/RefreshCache.java

代码语言:javascript复制
public class RefreshCache<K, V> extends LoadingCache<K, V> {

    private static final Logger logger = LoggerFactory.getLogger(RefreshCache.class);

    public static final byte[] LOCK_KEY_SUFFIX = "_#RL#".getBytes();
    public static final byte[] TIMESTAMP_KEY_SUFFIX = "_#TS#".getBytes();

    private ConcurrentHashMap<Object, RefreshTask> taskMap = new ConcurrentHashMap<>();

    private boolean multiLevelCache;

    public RefreshCache(Cache cache) {
        super(cache);
        multiLevelCache = isMultiLevelCache();
    }

    protected void stopRefresh() {
        List<RefreshTask> tasks = new ArrayList<>();
        tasks.addAll(taskMap.values());
        tasks.forEach(task -> task.cancel());
    }

    @Override
    public void close() {
        stopRefresh();
        super.close();
    }


    private boolean hasLoader() {
        return config.getLoader() != null;
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader) {
        return computeIfAbsent(key, loader, config().isCacheNullValue());
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                0, null, this);
    }

    @Override
    public V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
                             long expireAfterWrite, TimeUnit timeUnit) {
        return AbstractCache.computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
                expireAfterWrite, timeUnit, this);
    }

    private boolean isMultiLevelCache() {
        Cache c = getTargetCache();
        while (c instanceof ProxyCache) {
            c = ((ProxyCache) c).getTargetCache();
        }
        return c instanceof MultiLevelCache;
    }

    private Object getTaskId(K key) {
        Cache c = concreteCache();
        if (c instanceof AbstractEmbeddedCache) {
            return ((AbstractEmbeddedCache) c).buildKey(key);
        } else if (c instanceof AbstractExternalCache) {
            byte[] bs = ((AbstractExternalCache) c).buildKey(key);
            return ByteBuffer.wrap(bs);
        } else {
            logger.error("can't getTaskId from "   c.getClass());
            return null;
        }
    }

    @Override
    public V get(K key) throws CacheInvokeException {
        if (config.getRefreshPolicy() != null && hasLoader()) {
            addOrUpdateRefreshTask(key, null);
        }
        return super.get(key);
    }

    @Override
    public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
        if (config.getRefreshPolicy() != null && hasLoader()) {
            for (K key : keys) {
                addOrUpdateRefreshTask(key, null);
            }
        }
        return super.getAll(keys);
    }

    //......

    private byte[] combine(byte[] bs1, byte[] bs2) {
        byte[] newArray = Arrays.copyOf(bs1, bs1.length   bs2.length);
        System.arraycopy(bs2, 0, newArray, bs1.length, bs2.length);
        return newArray;
    }
}

RefreshCache继承了LoadingCache,它定义了taskMap用于存储RefreshTask,它覆盖了computeIfAbsent方法,复用了AbstractCache定义的静态方法computeIfAbsentImpl,传入的cache为this;其get及getAll方法对于config.getRefreshPolicy()及config.getLoader()不为null的会额外执行addOrUpdateRefreshTask

addOrUpdateRefreshTask

代码语言:javascript复制
    protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
        RefreshPolicy refreshPolicy = config.getRefreshPolicy();
        if (refreshPolicy == null) {
            return;
        }
        long refreshMillis = refreshPolicy.getRefreshMillis();
        if (refreshMillis > 0) {
            Object taskId = getTaskId(key);
            RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
                logger.debug("add refresh task. interval={},  key={}", refreshMillis , key);
                RefreshTask task = new RefreshTask(taskId, key, loader);
                task.lastAccessTime = System.currentTimeMillis();
                ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
                        task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
                task.future = future;
                return task;
            });
            refreshTask.lastAccessTime = System.currentTimeMillis();
        }
    }

addOrUpdateRefreshTask方法先获取refreshMillis,对于refreshMillis小于等于0的不处理,大于0的先通过getTaskId(key)获取taskId,然后通过taskMap.computeIfAbsent获取或者创建refreshTask,最后更新refreshTask的lastAccessTime;当新创建refreshTask时,会通过JJetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS)去调度该refreshTask,即每隔refreshMillis调度执行refreshTask

RefreshTask

代码语言:javascript复制
    class RefreshTask implements Runnable {
        private Object taskId;
        private K key;
        private CacheLoader<K, V> loader;

        private long lastAccessTime;
        private ScheduledFuture future;

        RefreshTask(Object taskId, K key, CacheLoader<K, V> loader) {
            this.taskId = taskId;
            this.key = key;
            this.loader = loader;
        }

        private void cancel() {
            logger.debug("cancel refresh: {}", key);
            future.cancel(false);
            taskMap.remove(taskId);
        }

        private void load() throws Throwable {
            CacheLoader<K,V> l = loader == null? config.getLoader(): loader;
            if (l != null) {
                l = CacheUtil.createProxyLoader(cache, l, eventConsumer);
                V v = l.load(key);
                if (needUpdate(v, l)) {
                    cache.PUT(key, v);
                }
            }
        }

        private void externalLoad(final Cache concreteCache, final long currentTime)
                throws Throwable {
            byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
            byte[] lockKey = combine(newKey, LOCK_KEY_SUFFIX);
            long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
            long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
            byte[] timestampKey = combine(newKey, TIMESTAMP_KEY_SUFFIX);

            // AbstractExternalCache buildKey method will not convert byte[]
            CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey);
            boolean shouldLoad = false;
            if (refreshTimeResult.isSuccess()) {
                shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString())   refreshMillis;
            } else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) {
                shouldLoad = true;
            }

            if (!shouldLoad) {
                if (multiLevelCache) {
                    refreshUpperCaches(key);
                }
                return;
            }

            Runnable r = () -> {
                try {
                    load();
                    // AbstractExternalCache buildKey method will not convert byte[]
                    concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis()));
                } catch (Throwable e) {
                    throw new CacheException("refresh error", e);
                }
            };

            // AbstractExternalCache buildKey method will not convert byte[]
            boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
            if(!lockSuccess && multiLevelCache) {
                JetCacheExecutor.heavyIOExecutor().schedule(
                        () -> refreshUpperCaches(key), (long)(0.2 * refreshMillis), TimeUnit.MILLISECONDS);
            }
        }

        private void refreshUpperCaches(K key) {
            MultiLevelCache<K, V> targetCache = (MultiLevelCache<K, V>) getTargetCache();
            Cache[] caches = targetCache.caches();
            int len = caches.length;

            CacheGetResult cacheGetResult = caches[len - 1].GET(key);
            if (!cacheGetResult.isSuccess()) {
                return;
            }
            for (int i = 0; i < len - 1; i  ) {
                caches[i].PUT(key, cacheGetResult.getValue());
            }
        }

        @Override
        public void run() {
            try {
                if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
                    cancel();
                    return;
                }
                long now = System.currentTimeMillis();
                long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
                if (stopRefreshAfterLastAccessMillis > 0) {
                    if (lastAccessTime   stopRefreshAfterLastAccessMillis < now) {
                        logger.debug("cancel refresh: {}", key);
                        cancel();
                        return;
                    }
                }
                logger.debug("refresh key: {}", key);
                Cache concreteCache = concreteCache();
                if (concreteCache instanceof AbstractExternalCache) {
                    externalLoad(concreteCache, now);
                } else {
                    load();
                }
            } catch (Throwable e) {
                logger.error("refresh error: key="   key, e);
            }
        }
    }

    protected Cache concreteCache() {
        Cache c = getTargetCache();
        while (true) {
            if (c instanceof ProxyCache) {
                c = ((ProxyCache) c).getTargetCache();
            } else if (c instanceof MultiLevelCache) {
                Cache[] caches = ((MultiLevelCache) c).caches();
                c = caches[caches.length - 1];
            } else {
                return c;
            }
        }
    }    

RefreshTask的run方法先判断stopRefreshAfterLastAccessMillis,若stopRefreshAfterLastAccessMillis大于0且lastAccessTime stopRefreshAfterLastAccessMillis小于now则执行cancel,然后返回;之后通过concreteCache获取具体的cache,对于AbstractExternalCache类型的执行externalLoad,否则执行load load方法通过loader去加载,对于needUpdate的执行PUT去更新;对于externalLoad的会先获取refreshTimeResult,对于CacheResultCode.NOT_EXISTS的则shouldLoad为true,对于refreshTimeResult.isSuccess()的对于currentTime大于等于Long.parseLong(refreshTimeResult.getValue().toString()) refreshMillis的则shouldLoad为true,对于非shouldLoad的且是multiLevelCache则执行refreshUpperCaches然后返回;否则使用concreteCache.tryLockAndRun去加锁然后执行load方法,若加锁不成功且是multiLevelCache的,则通过JetCacheExecutor.heavyIOExecutor().schedule去调度执行refreshUpperCaches

SimpleCacheManager.create

jetcache-core/src/main/java/com/alicp/jetcache/SimpleCacheManager.java

代码语言:javascript复制
    private Cache create(QuickConfig config) {
        Cache cache;
        if (config.getCacheType() == null || config.getCacheType() == CacheType.REMOTE) {
            cache = buildRemote(config);
        } else if (config.getCacheType() == CacheType.LOCAL) {
            cache = buildLocal(config);
        } else {
            Cache local = buildLocal(config);
            Cache remote = buildRemote(config);


            boolean useExpireOfSubCache = config.getLocalExpire() != null;
            cache = MultiLevelCacheBuilder.createMultiLevelCacheBuilder()
                    .expireAfterWrite(remote.config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS)
                    .addCache(local, remote)
                    .useExpireOfSubCache(useExpireOfSubCache)
                    .cacheNullValue(config.getCacheNullValue() != null ?
                            config.getCacheNullValue() : DEFAULT_CACHE_NULL_VALUE)
                    .buildCache();
        }
        if (config.getRefreshPolicy() != null) {
            cache = new RefreshCache(cache);
        } else if (config.getLoader() != null) {
            cache = new LoadingCache(cache);
        }
        cache.config().setRefreshPolicy(config.getRefreshPolicy());
        cache.config().setLoader(config.getLoader());


        boolean protect = config.getPenetrationProtect() != null ? config.getPenetrationProtect()
                : cacheBuilderTemplate.isPenetrationProtect();
        cache.config().setCachePenetrationProtect(protect);
        cache.config().setPenetrationProtectTimeout(config.getPenetrationProtectTimeout());

        for (CacheMonitorInstaller i : cacheBuilderTemplate.getCacheMonitorInstallers()) {
            i.addMonitors(this, cache, config);
        }
        return cache;
    }

SimpleCacheManager的create方法对于config.getRefreshPolicy()不为null的则创建RefreshCache去包装cache,否则再对于config.getLoader()不为null使用LoadingCache去包装cache

小结

ProxyCache继承了Cache接口,它定义了getTargetCache方法,并默认实现了unwrap方法;SimpleProxyCache实现了ProxyCache接口,它定义了cache属性,其方法均委托给了cache对象

LoadingCache继承了SimpleProxyCache,其get方法会判断config.getLoader()是否为null,不为null则执行AbstractCache.computeIfAbsentImpl(key, loader,config.isCacheNullValue() ,0, null, this);getAll方法多了loader以及config.isCachePenetrationProtect()的处理逻辑,对于开启cachePenetrationProtect的会通过CacheUtil.createProxyLoader包装一下loader,然后AbstractCache.synchronizedLoad去加载value RefreshCache继承了LoadingCache,它定义了taskMap用于存储RefreshTask,它覆盖了computeIfAbsent方法,复用了AbstractCache定义的静态方法computeIfAbsentImpl,传入的cache为this;其get及getAll方法对于config.getRefreshPolicy()及config.getLoader()不为null的会额外执行addOrUpdateRefreshTask SimpleCacheManager的create方法根据QuickConfig先构造local或者remote或者multiLevelCache,再根据config的loader或者refreshPolicy去创建包装类,对于config.getRefreshPolicy()不为null的则创建RefreshCache去包装cache,否则再对于config.getLoader()不为null使用LoadingCache去包装cache

0 人点赞