/*
 * Copyright 2013 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectCleaner;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;

Light-weight object pool based on a thread-local stack.
Type parameters:
  • <T> – the type of the pooled object
/** * Light-weight object pool based on a thread-local stack. * * @param <T> the type of the pooled object */
public abstract class Recycler<T> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class); @SuppressWarnings("rawtypes") private static final Handle NOOP_HANDLE = new Handle() { @Override public void recycle(Object object) { // NOOP } }; private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE); private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement(); private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default. private static final int DEFAULT_MAX_CAPACITY_PER_THREAD; private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int MAX_DELAYED_QUEUES_PER_THREAD; private static final int LINK_CAPACITY; private static final int RATIO; static { // In the future, we might have different maxCapacity for different object types. // e.g. io.netty.recycler.maxCapacity.writeTask // io.netty.recycler.maxCapacity.outboundBuffer int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread", SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD)); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD; } DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread; MAX_SHARED_CAPACITY_FACTOR = max(2, SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); MAX_DELAYED_QUEUES_PER_THREAD = max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", // We use the same value as default EventLoop number NettyRuntime.availableProcessors() * 2)); LINK_CAPACITY = safeFindNextPositivePowerOfTwo( max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before. // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation // bursts. RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8)); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled"); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled"); logger.debug("-Dio.netty.recycler.linkCapacity: disabled"); logger.debug("-Dio.netty.recycler.ratio: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR); logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY); logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); } } INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256); } private final int maxCapacityPerThread; private final int maxSharedCapacityFactor; private final int ratioMask; private final int maxDelayedQueuesPerThread; private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } }; protected Recycler() { this(DEFAULT_MAX_CAPACITY_PER_THREAD); } protected Recycler(int maxCapacityPerThread) { this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) { this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD); } protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1; if (maxCapacityPerThread <= 0) { this.maxCapacityPerThread = 0; this.maxSharedCapacityFactor = 1; this.maxDelayedQueuesPerThread = 0; } else { this.maxCapacityPerThread = maxCapacityPerThread; this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread); } } @SuppressWarnings("unchecked") public final T get() { if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
Deprecated:use Handle.recycle(Object).
/** * @deprecated use {@link Handle#recycle(Object)}. */
@Deprecated public final boolean recycle(T o, Handle<T> handle) { if (handle == NOOP_HANDLE) { return false; } DefaultHandle<T> h = (DefaultHandle<T>) handle; if (h.stack.parent != this) { return false; } h.recycle(o); return true; } final int threadLocalCapacity() { return threadLocal.get().elements.length; } final int threadLocalSize() { return threadLocal.get().size; } protected abstract T newObject(Handle<T> handle); public interface Handle<T> { void recycle(T object); } static final class DefaultHandle<T> implements Handle<T> { private int lastRecycledId; private int recycleId; boolean hasBeenRecycled; private Stack<?> stack; private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } stack.push(this); } } private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); } }; // a queue that makes only moderate guarantees about visibility: items are seen in the correct order, // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain private static final class WeakOrderQueue { static final WeakOrderQueue DUMMY = new WeakOrderQueue(); // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex. @SuppressWarnings("serial") static final class Link extends AtomicInteger { private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY]; private int readIndex; Link next; } // This act as a place holder for the head Link but also will be used by the ObjectCleaner // to return space that was before reserved. Its important this does not hold any reference to // either Stack or WeakOrderQueue. static final class Head implements Runnable { private final AtomicInteger availableSharedCapacity; Link link; Head(AtomicInteger availableSharedCapacity) { this.availableSharedCapacity = availableSharedCapacity; } @Override public void run() { Link head = link; while (head != null) { reclaimSpace(LINK_CAPACITY); head = head.next; } } void reclaimSpace(int space) { assert space >= 0; availableSharedCapacity.addAndGet(space); } boolean reserveSpace(int space) { return reserveSpace(availableSharedCapacity, space); } static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) { assert space >= 0; for (;;) { int available = availableSharedCapacity.get(); if (available < space) { return false; } if (availableSharedCapacity.compareAndSet(available, available - space)) { return true; } } } } // chain of data items private final Head head; private Link tail; // pointer to another queue of delayed items for the same stack private WeakOrderQueue next; private final WeakReference<Thread> owner; private final int id = ID_GENERATOR.getAndIncrement(); private WeakOrderQueue() { owner = null; head = new Head(null); } private WeakOrderQueue(Stack<?> stack, Thread thread) { tail = new Link(); // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. head = new Head(stack.availableSharedCapacity); head.link = tail; owner = new WeakReference<Thread>(thread); } static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) { final WeakOrderQueue queue = new WeakOrderQueue(stack, thread); // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so // may be accessed while its still constructed. stack.setHead(queue); // We need to reclaim all space that was reserved by this WeakOrderQueue so we not run out of space in // the stack. This is needed as we not have a good life-time control over the queue as it is used in a // WeakHashMap which will drop it at any time. final Head head = queue.head; ObjectCleaner.register(queue, head); return queue; } private void setNext(WeakOrderQueue next) { assert next != this; this.next = next; }
Allocate a new WeakOrderQueue or return null if not possible.
/** * Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible. */
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) { // We allocated a Link so reserve the space return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) ? newQueue(stack, thread) : null; } void add(DefaultHandle<?> handle) { handle.lastRecycledId = id; Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { if (!head.reserveSpace(LINK_CAPACITY)) { // Drop it. return; } // We allocate a Link so reserve the space this.tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1); } boolean hasFinalData() { return tail.readIndex != tail.get(); } // transfer as many items as we can from this queue to the stack, returning true if any were transferred @SuppressWarnings("rawtypes") boolean transfer(Stack<?> dst) { Link head = this.head.link; if (head == null) { return false; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } this.head.link = head = head.next; } final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } srcElems[i] = null; if (dst.dropHandle(element)) { // Drop the object. continue; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. this.head.reclaimSpace(LINK_CAPACITY); this.head.link = head.next; } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false; } dst.size = newDstSize; return true; } else { // The destination stack is full already. return false; } } } static final class Stack<T> { // we keep a queue of per-thread queues, which is appended to once only, each time a new thread other // than the stack owner recycles: when we run out of items in our stack we iterate this collection // to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst // still recycling all items. final Recycler<T> parent; // We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong // Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack. // // The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if // the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear // it in a timely manner). final WeakReference<Thread> threadRef; final AtomicInteger availableSharedCapacity; final int maxDelayedQueues; private final int maxCapacity; private final int ratioMask; private DefaultHandle<?>[] elements; private int size; private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask, int maxDelayedQueues) { this.parent = parent; threadRef = new WeakReference<Thread>(thread); this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; this.ratioMask = ratioMask; this.maxDelayedQueues = maxDelayedQueues; } // Marked as synchronized to ensure this is serialized. synchronized void setHead(WeakOrderQueue queue) { queue.setNext(head); head = queue; } int increaseCapacity(int expectedCapacity) { int newCapacity = elements.length; int maxCapacity = this.maxCapacity; do { newCapacity <<= 1; } while (newCapacity < expectedCapacity && newCapacity < maxCapacity); newCapacity = min(newCapacity, maxCapacity); if (newCapacity != elements.length) { elements = Arrays.copyOf(elements, newCapacity); } return newCapacity; } @SuppressWarnings({ "unchecked", "rawtypes" }) DefaultHandle<T> pop() { int size = this.size; if (size == 0) { if (!scavenge()) { return null; } size = this.size; } size --; DefaultHandle ret = elements[size]; elements[size] = null; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; } boolean scavenge() { // continue an existing scavenge, if any if (scavengeSome()) { return true; } // reset our scavenge cursor prev = null; cursor = head; return false; } boolean scavengeSome() { WeakOrderQueue prev; WeakOrderQueue cursor = this.cursor; if (cursor == null) { prev = null; cursor = head; if (cursor == null) { return false; } } else { prev = this.prev; } boolean success = false; do { if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.next; if (cursor.owner.get() == null) { // If the thread associated with the queue is gone, unlink it, after // performing a volatile read to confirm there is no data left to collect. // We never unlink the first queue, as we don't want to synchronize on updating the head. if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { prev.setNext(next); } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; } void push(DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (threadRef.get() == currentThread) { // The current Thread is the thread that belongs to the Stack, we can try to push the object now. pushNow(item); } else { // The current Thread is not the one that belongs to the Stack // (or the Thread that belonged to the Stack was collected already), we need to signal that the push // happens later. pushLater(item, currentThread); } } private void pushNow(DefaultHandle<?> item) { if ((item.recycleId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; if (size >= maxCapacity || dropHandle(item)) { // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); } elements[size] = item; this.size = size + 1; } private void pushLater(DefaultHandle<?> item, Thread thread) { // we don't want to have a ref to the queue as the value in our weak map // so we null it out; to ensure there are no races with restoring it later // we impose a memory ordering here (no-op on x86) Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { if (delayedRecycled.size() >= maxDelayedQueues) { // Add a dummy queue so we know we should drop the object delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // Check if we already reached the maximum number of delayed queues and if we can allocate at all. if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } queue.add(item); } boolean dropHandle(DefaultHandle<?> handle) { if (!handle.hasBeenRecycled) { if ((++handleRecycleCount & ratioMask) != 0) { // Drop the object. return true; } handle.hasBeenRecycled = true; } return false; } DefaultHandle<T> newHandle() { return new DefaultHandle<T>(this); } } }