聊聊httpclient的CPool

2023-10-06 14:26:33 浏览数 (2)

本文主要研究一下httpclient的CPool

ConnPool

org/apache/http/pool/ConnPool.java

代码语言:javascript复制
public interface ConnPool<T, E> {

    /**
     * Attempts to lease a connection for the given route and with the given
     * state from the pool.
     *
     * @param route route of the connection.
     * @param state arbitrary object that represents a particular state
     *  (usually a security principal or a unique token identifying
     *  the user whose credentials have been used while establishing the connection).
     *  May be {@code null}.
     * @param callback operation completion callback.
     *
     * @return future for a leased pool entry.
     */
    Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);

    /**
     * Releases the pool entry back to the pool.
     *
     * @param entry pool entry leased from the pool
     * @param reusable flag indicating whether or not the released connection
     *   is in a consistent state and is safe for further use.
     */
    void release(E entry, boolean reusable);

}

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry

ConnPoolControl

代码语言:javascript复制
public interface ConnPoolControl<T> {

    void setMaxTotal(int max);

    int getMaxTotal();

    void setDefaultMaxPerRoute(int max);

    int getDefaultMaxPerRoute();

    void setMaxPerRoute(final T route, int max);

    int getMaxPerRoute(final T route);

    PoolStats getTotalStats();

    PoolStats getStats(final T route);

}

ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法

AbstractConnPool

org/apache/http/pool/AbstractConnPool.java

代码语言:javascript复制
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                                               implements ConnPool<T, E>, ConnPoolControl<T> {

    private final Lock lock;
    private final Condition condition;
    private final ConnFactory<T, C> connFactory;
    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
    private final Set<E> leased;
    private final LinkedList<E> available;
    private final LinkedList<Future<E>> pending;
    private final Map<T, Integer> maxPerRoute;

    private volatile boolean isShutDown;
    private volatile int defaultMaxPerRoute;
    private volatile int maxTotal;
    private volatile int validateAfterInactivity;

    public AbstractConnPool(
            final ConnFactory<T, C> connFactory,
            final int defaultMaxPerRoute,
            final int maxTotal) {
        super();
        this.connFactory = Args.notNull(connFactory, "Connection factory");
        this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
        this.maxTotal = Args.positive(maxTotal, "Max total value");
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
        this.leased = new HashSet<E>();
        this.available = new LinkedList<E>();
        this.pending = new LinkedList<Future<E>>();
        this.maxPerRoute = new HashMap<T, Integer>();
    }

    /**
     * Creates a new entry for the given connection with the given route.
     */
    protected abstract E createEntry(T route, C conn);

    //......
 }   

AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType

shutdown

代码语言:javascript复制
    public void shutdown() throws IOException {
        if (this.isShutDown) {
            return ;
        }
        this.isShutDown = true;
        this.lock.lock();
        try {
            for (final E entry: this.available) {
                entry.close();
            }
            for (final E entry: this.leased) {
                entry.close();
            }
            for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
                pool.shutdown();
            }
            this.routeToPool.clear();
            this.leased.clear();
            this.available.clear();
        } finally {
            this.lock.unlock();
        }
    }

shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown

lease方法

代码语言:javascript复制
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");

        return new Future<E>() {

            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);

            @Override
            public boolean cancel(final boolean mayInterruptIfRunning) {
                if (done.compareAndSet(false, true)) {
                    cancelled.set(true);
                    lock.lock();
                    try {
                        condition.signalAll();
                    } finally {
                        lock.unlock();
                    }
                    if (callback != null) {
                        callback.cancelled();
                    }
                    return true;
                }
                return false;
            }

            @Override
            public boolean isCancelled() {
                return cancelled.get();
            }

            @Override
            public boolean isDone() {
                return done.get();
            }

            @Override
            public E get() throws InterruptedException, ExecutionException {
                try {
                    return get(0L, TimeUnit.MILLISECONDS);
                } catch (final TimeoutException ex) {
                    throw new ExecutionException(ex);
                }
            }

            @Override
            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                for (;;) {
                    synchronized (this) {
                        try {
                            final E entry = entryRef.get();
                            if (entry != null) {
                                return entry;
                            }
                            if (done.get()) {
                                throw new ExecutionException(operationAborted());
                            }
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                            if (validateAfterInactivity > 0)  {
                                if (leasedEntry.getUpdated()   validateAfterInactivity <= System.currentTimeMillis()) {
                                    if (!validate(leasedEntry)) {
                                        leasedEntry.close();
                                        release(leasedEntry, false);
                                        continue;
                                    }
                                }
                            }
                            if (done.compareAndSet(false, true)) {
                                entryRef.set(leasedEntry);
                                done.set(true);
                                onLease(leasedEntry);
                                if (callback != null) {
                                    callback.completed(leasedEntry);
                                }
                                return leasedEntry;
                            } else {
                                release(leasedEntry, true);
                                throw new ExecutionException(operationAborted());
                            }
                        } catch (final IOException ex) {
                            if (done.compareAndSet(false, true)) {
                                if (callback != null) {
                                    callback.failed(ex);
                                }
                            }
                            throw new ExecutionException(ex);
                        }
                    }
                }
            }

        };
    }

lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法

getPoolEntryBlocking

org/apache/http/pool/AbstractConnPool.java

代码语言:javascript复制
    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit timeUnit,
            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis()   timeUnit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                if (future.isCancelled()) {
                    throw new ExecutionException(operationAborted());
                }
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount()   1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i  ) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

                boolean success = false;
                try {
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new ExecutionException(operationAborted());
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建

release

代码语言:javascript复制
    @Override
    public void release(final E entry, final boolean reusable) {
        this.lock.lock();
        try {
            if (this.leased.remove(entry)) {
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                pool.free(entry, reusable);
                if (reusable && !this.isShutDown) {
                    this.available.addFirst(entry);
                } else {
                    entry.close();
                }
                onRelease(entry);
                Future<E> future = pool.nextPending();
                if (future != null) {
                    this.pending.remove(future);
                } else {
                    future = this.pending.poll();
                }
                if (future != null) {
                    this.condition.signalAll();
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

CPool

org/apache/http/impl/conn/CPool.java

代码语言:javascript复制
@Contract(threading = ThreadingBehavior.SAFE)
class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {

    private static final AtomicLong COUNTER = new AtomicLong();

    private final Log log = LogFactory.getLog(CPool.class);
    private final long timeToLive;
    private final TimeUnit timeUnit;

    public CPool(
            final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
            final int defaultMaxPerRoute, final int maxTotal,
            final long timeToLive, final TimeUnit timeUnit) {
        super(connFactory, defaultMaxPerRoute, maxTotal);
        this.timeToLive = timeToLive;
        this.timeUnit = timeUnit;
    }

    @Override
    protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {
        final String id = Long.toString(COUNTER.getAndIncrement());
        return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);
    }

    @Override
    protected boolean validate(final CPoolEntry entry) {
        return !entry.getConnection().isStale();
    }

    @Override
    protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
        super.enumAvailable(callback);
    }

    @Override
    protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
        super.enumLeased(callback);
    }

}

CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale

小结

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。

AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

0 人点赞