/*
 * JBoss, Home of Professional Open Source
 *
 * Copyright 2010 Red Hat, Inc. and/or its affiliates, and individual
 * contributors as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.xnio;

import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.xnio._private.Messages.msg;

A buffer pooled allocator. This pool uses a series of buffer regions to back the returned pooled buffers. When the buffer is no longer needed, it should be freed back into the pool; failure to do so will cause the corresponding buffer area to be unavailable until the buffer is garbage-collected. If the buffer pool is no longer used, it is advisable to invoke clean() to make sure that direct allocated buffers can be reused by a future instance.
Author:David M. Lloyd, Flavia Rainone
Deprecated:See ByteBufferPool.
/** * A buffer pooled allocator. This pool uses a series of buffer regions to back the * returned pooled buffers. When the buffer is no longer needed, it should be freed back into the pool; failure * to do so will cause the corresponding buffer area to be unavailable until the buffer is garbage-collected. * * If the buffer pool is no longer used, it is advisable to invoke {@link #clean()} to make * sure that direct allocated buffers can be reused by a future instance. * * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a> * @author Flavia Rainone * @deprecated See {@link ByteBufferPool}. */
public final class ByteBufferSlicePool implements Pool<ByteBuffer> { private static final int LOCAL_LENGTH; private static final Queue<ByteBuffer> FREE_DIRECT_BUFFERS; static { // read thread local size property String value = AccessController.doPrivileged(new ReadPropertyAction("xnio.bufferpool.threadlocal.size", "12")); int val; try { val = Integer.parseInt(value); } catch (NumberFormatException ignored) { val = 12; } LOCAL_LENGTH = val; // free direct buffers queue to keep direct buffers that are out of reach because of garbage collection of pools FREE_DIRECT_BUFFERS = new ConcurrentLinkedQueue<>(); } private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<>()); private final Queue<Slice> sliceQueue; private final BufferAllocator<ByteBuffer> allocator; private final int bufferSize; private final int buffersPerRegion; private final int threadLocalQueueSize; private final List<ByteBuffer> directBuffers; private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocalCacheWrapper(this);
Construct a new instance.
Params:
  • allocator – the buffer allocator to use
  • bufferSize – the size of each buffer
  • maxRegionSize – the maximum region size for each backing buffer
  • threadLocalQueueSize – the number of buffers to cache on each thread
/** * Construct a new instance. * * @param allocator the buffer allocator to use * @param bufferSize the size of each buffer * @param maxRegionSize the maximum region size for each backing buffer * @param threadLocalQueueSize the number of buffers to cache on each thread */
public ByteBufferSlicePool(final BufferAllocator<ByteBuffer> allocator, final int bufferSize, final int maxRegionSize, final int threadLocalQueueSize) { if (bufferSize <= 0) { throw msg.parameterOutOfRange("bufferSize"); } if (maxRegionSize < bufferSize) { throw msg.parameterOutOfRange("bufferSize"); } buffersPerRegion = maxRegionSize / bufferSize; this.bufferSize = bufferSize; this.allocator = allocator; sliceQueue = new ConcurrentLinkedQueue<>(); this.threadLocalQueueSize = threadLocalQueueSize; // handle direct byte buffer allocation for reuse of direct buffers if (allocator == BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR) { directBuffers = Collections.synchronizedList(new ArrayList<>()); } else { directBuffers = null; } }
Construct a new instance.
Params:
  • allocator – the buffer allocator to use
  • bufferSize – the size of each buffer
  • maxRegionSize – the maximum region size for each backing buffer
/** * Construct a new instance. * * @param allocator the buffer allocator to use * @param bufferSize the size of each buffer * @param maxRegionSize the maximum region size for each backing buffer */
public ByteBufferSlicePool(final BufferAllocator<ByteBuffer> allocator, final int bufferSize, final int maxRegionSize) { this(allocator, bufferSize, maxRegionSize, LOCAL_LENGTH); }
Construct a new instance, using a direct buffer allocator.
Params:
  • bufferSize – the size of each buffer
  • maxRegionSize – the maximum region size for each backing buffer
/** * Construct a new instance, using a direct buffer allocator. * * @param bufferSize the size of each buffer * @param maxRegionSize the maximum region size for each backing buffer */
public ByteBufferSlicePool(final int bufferSize, final int maxRegionSize) { this(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, bufferSize, maxRegionSize); }
{@inheritDoc}
/** {@inheritDoc} */
public Pooled<ByteBuffer> allocate() { Slice slice; if (threadLocalQueueSize > 0) { ThreadLocalCache localCache = localQueueHolder.get(); if(localCache.outstanding != threadLocalQueueSize) { localCache.outstanding++; } slice = localCache.queue.poll(); if (slice != null) { return new PooledByteBuffer(slice, slice.slice()); } } final Queue<Slice> sliceQueue = this.sliceQueue; slice = sliceQueue.poll(); if (slice != null) { return new PooledByteBuffer(slice, slice.slice()); } synchronized (sliceQueue) { slice = sliceQueue.poll(); if (slice != null) { return new PooledByteBuffer(slice, slice.slice()); } final Slice newSlice = allocateSlices(buffersPerRegion, bufferSize); return new PooledByteBuffer(newSlice, newSlice.slice()); } } private Slice allocateSlices(final int buffersPerRegion, final int bufferSize) { // only true if using direct allocation if (directBuffers != null) { ByteBuffer region = FREE_DIRECT_BUFFERS.poll(); try { if (region != null) { return sliceReusedBuffer(region, buffersPerRegion, bufferSize); } region = allocator.allocate(buffersPerRegion * bufferSize); return sliceAllocatedBuffer(region, buffersPerRegion, bufferSize); } finally { // add all directly allocated memory to directBuffers, so it can // be added to FREE_DIRECT_BUFFERS on clean() directBuffers.add(region); } } return sliceAllocatedBuffer( allocator.allocate(buffersPerRegion * bufferSize), buffersPerRegion, bufferSize); } private Slice sliceReusedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) { int maxI = Math.min(buffersPerRegion, region.capacity() / bufferSize); // create slices int idx = bufferSize; for (int i = 1; i < maxI; i++) { sliceQueue.add(new Slice(region, idx, bufferSize)); idx += bufferSize; } if (maxI == 0) return allocateSlices(buffersPerRegion, bufferSize); if (maxI < buffersPerRegion) sliceQueue.add(allocateSlices(buffersPerRegion - maxI, bufferSize)); return new Slice(region, 0, bufferSize); } private Slice sliceAllocatedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) { // create slices int idx = bufferSize; for (int i = 1; i < buffersPerRegion; i++) { sliceQueue.add(new Slice(region, idx, bufferSize)); idx += bufferSize; } return new Slice(region, 0, bufferSize); }
Cleans the pool, removing references to any buffers inside it. Should be invoked on pool disposal, when the pool will no longer be used.
/** * Cleans the pool, removing references to any buffers inside it. * Should be invoked on pool disposal, when the pool will no longer be * used. */
public void clean() { ThreadLocalCache localCache = localQueueHolder.get(); if (!localCache.queue.isEmpty()) { localCache.queue.clear(); } if(!sliceQueue.isEmpty()) { sliceQueue.clear(); } // only true if using direct allocation if (directBuffers != null) { // pass everything that is directly allocated to free direct buffers FREE_DIRECT_BUFFERS.addAll(directBuffers); } }
Return the size of the ByteBuffers that are returned by allocate().
/** * Return the size of the {@link ByteBuffer}s that are returned by {@link #allocate()}. */
public int getBufferSize() { return bufferSize; } private ThreadLocalCache createThreadLocalCache() { return new ThreadLocalCache(this); } private void freeThreadLocalCache(ThreadLocalCache cache) { final ArrayDeque<Slice> deque = cache.queue; Slice slice = deque.poll(); while (slice != null) { doFree(slice); slice = deque.poll(); } } private void doFree(Slice region) { if (threadLocalQueueSize > 0) { final ThreadLocalCache localCache = localQueueHolder.get(); boolean cacheOk = false; if(localCache.outstanding > 0) { localCache.outstanding--; cacheOk = true; } ArrayDeque<Slice> localQueue = localCache.queue; if (localQueue.size() == threadLocalQueueSize || !cacheOk) { sliceQueue.add(region); } else { localQueue.add(region); } } else { sliceQueue.add(region); } } private final class PooledByteBuffer implements Pooled<ByteBuffer> { private final Slice region; ByteBuffer buffer; PooledByteBuffer(final Slice region, final ByteBuffer buffer) { this.region = region; this.buffer = buffer; } public void discard() { final ByteBuffer buffer = this.buffer; this.buffer = null; if (buffer != null) { // free when GC'd, no sooner refSet.add(new Ref(buffer, region)); } } public void free() { ByteBuffer buffer = this.buffer; this.buffer = null; if (buffer != null) { // trust the user, repool the buffer doFree(region); } } public ByteBuffer getResource() { final ByteBuffer buffer = this.buffer; if (buffer == null) { throw msg.bufferFreed(); } return buffer; } public void close() { free(); } public String toString() { return "Pooled buffer " + buffer; } } // to prevent memory leaks via thread internal map for thread local, we need to // make this class static or else the outer ByteBufferSlicePool // is never collected while the thread is active // Thread -> thread local map -> ThreadLocalCacheWrapper -> ThreadLocalCache -> queue -> Slices -> ByteBufferSlicePool private static final class Slice { private final ByteBuffer parent; private Slice(final ByteBuffer parent, final int start, final int size) { this.parent = (ByteBuffer) parent.duplicate().position(start).limit(start+size); } ByteBuffer slice() { return parent.slice(); } } final class Ref extends AutomaticReference<ByteBuffer> { private final Slice region; private Ref(final ByteBuffer referent, final Slice region) { super(referent, AutomaticReference.PERMIT); this.region = region; } protected void free() { doFree(region); refSet.remove(this); } } final static class ThreadLocalCache { // to prevent memory leaks via thread internal map for thread local, we need to // weakly reference the outer ByteBufferSlicePool // or else the pool is never collected while the thread is active // Thread -> thread local map -> ThreadLocalCache -> pool final WeakReference<ByteBufferSlicePool> pool; // internal queue of slices; used to prevent all threads synchronizing on a single queue final ArrayDeque<Slice> queue; // indicates how many slices should be returned to queue on free int outstanding = 0; ThreadLocalCache(ByteBufferSlicePool pool) { this.pool = new WeakReference<>(pool); this.queue = new ArrayDeque<Slice>(pool.threadLocalQueueSize) {
This sucks but there's no other way to ensure these buffers are returned to the pool.
/** * This sucks but there's no other way to ensure these buffers are returned to the pool. */
protected void finalize() { final ByteBufferSlicePool pool = ThreadLocalCache.this.pool.get(); if (pool == null) return; final ArrayDeque<Slice> deque = queue; Slice slice = deque.poll(); while (slice != null) { pool.doFree(slice); slice = deque.poll(); } } }; } } private static class ThreadLocalCacheWrapper extends ThreadLocal<ThreadLocalCache> { // to prevent memory leaks via thread internal map for thread local, we need to // weakly reference the outer ByteBufferSlicePool // or else the pool is never collected while the thread is active // Thread -> thread local map -> ThreadLocalCacheWrapper -> pool private final WeakReference<ByteBufferSlicePool> pool; ThreadLocalCacheWrapper(ByteBufferSlicePool pool) { this.pool = new WeakReference<>(pool); } protected ThreadLocalCache initialValue() { final ByteBufferSlicePool pool = this.pool.get(); if (pool != null) { //noinspection serial return pool.createThreadLocalCache(); } return null; } public void remove() { final ByteBufferSlicePool pool = this.pool.get(); final ThreadLocalCache cache = get(); if (pool != null && cache != null) { //noinspection serial pool.freeThreadLocalCache(cache); } super.remove(); } } }