/*
 * Copyright 2008-present MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.mongodb.internal.connection;

import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.internal.connection.ConcurrentLinkedDeque.RemovalReportingIterator;

import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

A concurrent pool implementation.

This class should not be considered a part of the public API.

/** * A concurrent pool implementation. * * <p>This class should not be considered a part of the public API.</p> */
public class ConcurrentPool<T> implements Pool<T> { private final int maxSize; private final ItemFactory<T> itemFactory; private final ConcurrentLinkedDeque<T> available = new ConcurrentLinkedDeque<T>(); private final Semaphore permits; private volatile boolean closed; public enum Prune {
Prune this element
/** * Prune this element */
YES,
Don't prone this element
/** * Don't prone this element */
NO,
Don't prune this element and stop attempting to prune additional elements
/** * Don't prune this element and stop attempting to prune additional elements */
STOP }
Factory for creating and closing pooled items.
Type parameters:
  • <T> –
/** * Factory for creating and closing pooled items. * * @param <T> */
public interface ItemFactory<T> { T create(boolean initialize); void close(T t); Prune shouldPrune(T t); }
Initializes a new pool of objects.
Params:
  • maxSize – max to hold to at any given time. if < 0 then no limit
  • itemFactory – factory used to create and close items in the pool
/** * Initializes a new pool of objects. * * @param maxSize max to hold to at any given time. if < 0 then no limit * @param itemFactory factory used to create and close items in the pool */
public ConcurrentPool(final int maxSize, final ItemFactory<T> itemFactory) { this.maxSize = maxSize; this.itemFactory = itemFactory; permits = new Semaphore(maxSize, true); }
Return an instance of T to the pool. This method simply calls release(t, false)
Params:
  • t – item to return to the pool
/** * Return an instance of T to the pool. This method simply calls {@code release(t, false)} * * @param t item to return to the pool */
@Override public void release(final T t) { release(t, false); }
call done when you are done with an object from the pool if there is room and the object is ok will get added
Params:
  • t – item to return to the pool
  • prune – true if the item should be closed, false if it should be put back in the pool
/** * call done when you are done with an object from the pool if there is room and the object is ok will get added * * @param t item to return to the pool * @param prune true if the item should be closed, false if it should be put back in the pool */
@Override public void release(final T t, final boolean prune) { if (t == null) { throw new IllegalArgumentException("Can not return a null item to the pool"); } if (closed) { close(t); return; } if (prune) { close(t); } else { available.addLast(t); } releasePermit(); }
Gets an object from the pool. This method will block until a permit is available.
Returns:An object from the pool.
/** * Gets an object from the pool. This method will block until a permit is available. * * @return An object from the pool. */
@Override public T get() { return get(-1, TimeUnit.MILLISECONDS); }
Gets an object from the pool - will block if none are available
Params:
  • timeout – negative - forever 0 - return immediately no matter what positive ms to wait
  • timeUnit – the time unit of the timeout
Throws:
Returns:An object from the pool, or null if can't get one in the given waitTime
/** * Gets an object from the pool - will block if none are available * * @param timeout negative - forever 0 - return immediately no matter what positive ms to wait * @param timeUnit the time unit of the timeout * @return An object from the pool, or null if can't get one in the given waitTime * @throws MongoTimeoutException if the timeout has been exceeded */
@Override public T get(final long timeout, final TimeUnit timeUnit) { if (closed) { throw new IllegalStateException("The pool is closed"); } if (!acquirePermit(timeout, timeUnit)) { throw new MongoTimeoutException(String.format("Timeout waiting for a pooled item after %d %s", timeout, timeUnit)); } T t = available.pollLast(); if (t == null) { t = createNewAndReleasePermitIfFailure(false); } return t; } public void prune() { for (RemovalReportingIterator<T> iter = available.iterator(); iter.hasNext();) { T cur = iter.next(); Prune shouldPrune = itemFactory.shouldPrune(cur); if (shouldPrune == Prune.STOP) { break; } if (shouldPrune == Prune.YES) { boolean removed = iter.reportingRemove(); if (removed) { close(cur); } } } } public void ensureMinSize(final int minSize, final boolean initialize) { while (getCount() < minSize) { if (!acquirePermit(10, TimeUnit.MILLISECONDS)) { break; } release(createNewAndReleasePermitIfFailure(initialize)); } } private T createNewAndReleasePermitIfFailure(final boolean initialize) { try { T newMember = itemFactory.create(initialize); if (newMember == null) { throw new MongoInternalException("The factory for the pool created a null item"); } return newMember; } catch (RuntimeException e) { permits.release(); throw e; } } protected boolean acquirePermit(final long timeout, final TimeUnit timeUnit) { try { if (closed) { return false; } else if (timeout >= 0) { return permits.tryAcquire(timeout, timeUnit); } else { permits.acquire(); return true; } } catch (InterruptedException e) { throw new MongoInterruptedException("Interrupted acquiring a permit to retrieve an item from the pool ", e); } } protected void releasePermit() { permits.release(); }
Clears the pool of all objects.
/** * Clears the pool of all objects. */
@Override public void close() { closed = true; Iterator<T> iter = available.iterator(); while (iter.hasNext()) { T t = iter.next(); close(t); iter.remove(); } } public int getMaxSize() { return maxSize; } public int getInUseCount() { return maxSize - permits.availablePermits(); } public int getAvailableCount() { return available.size(); } public int getCount() { return getInUseCount() + getAvailableCount(); } public String toString() { StringBuilder buf = new StringBuilder(); buf.append("pool: ") .append(" maxSize: ").append(maxSize) .append(" availableCount ").append(getAvailableCount()) .append(" inUseCount ").append(getInUseCount()); return buf.toString(); } // swallow exceptions from ItemFactory.close() private void close(final T t) { try { itemFactory.close(t); } catch (RuntimeException e) { // ItemFactory.close() really should not throw } } }