/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.netty.buffer;


import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.util.Queue;

Acts a Thread cache for allocations. This implementation is moduled after jemalloc and the descripted technics of Scalable memory allocation using jemalloc.
/** * Acts a Thread cache for allocations. This implementation is moduled after * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted * technics of * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919"> * Scalable memory allocation using jemalloc</a>. */
final class PoolThreadCache { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; // Hold the caches for the different size classes, which are tiny, small and normal. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; // Used for bitshifting when calculate the index of normal caches later private final int numShiftsNormalDirect; private final int numShiftsNormalHeap; private final int freeSweepAllocationThreshold; private int allocations; // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); } this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } } private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } } private static <T> MemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); @SuppressWarnings("unchecked") MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } } private static int log2(int val) { int res = 0; while (val > 1) { val >>= 1; res++; } return res; }
Try to allocate a tiny buffer out of the cache. Returns true if successful false otherwise
/** * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); }
Try to allocate a small buffer out of the cache. Returns true if successful false otherwise
/** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); }
Try to allocate a small buffer out of the cache. Returns true if successful false otherwise
/** * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise */
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } @SuppressWarnings({ "unchecked", "rawtypes" }) private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { // no cache found so just return false here return false; } boolean allocated = cache.allocate(buf, reqCapacity); if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; trim(); } return allocated; }
Add PoolChunk and handle to the cache if there is enough room. Returns true if it fit into the cache false otherwise.
/** * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room. * Returns {@code true} if it fit into the cache {@code false} otherwise. */
@SuppressWarnings({ "unchecked", "rawtypes" }) boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) { MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass); if (cache == null) { return false; } return cache.add(chunk, handle); } private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) { switch (sizeClass) { case Normal: return cacheForNormal(area, normCapacity); case Small: return cacheForSmall(area, normCapacity); case Tiny: return cacheForTiny(area, normCapacity); default: throw new Error(); } }
Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
/** * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */
void free() { int numFreed = free(tinySubPageDirectCaches) + free(smallSubPageDirectCaches) + free(normalDirectCaches) + free(tinySubPageHeapCaches) + free(smallSubPageHeapCaches) + free(normalHeapCaches); if (numFreed > 0 && logger.isDebugEnabled()) { logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName()); } if (directArena != null) { directArena.numThreadCaches.getAndDecrement(); } if (heapArena != null) { heapArena.numThreadCaches.getAndDecrement(); } } private static int free(MemoryRegionCache<?>[] caches) { if (caches == null) { return 0; } int numFreed = 0; for (MemoryRegionCache<?> c: caches) { numFreed += free(c); } return numFreed; } private static int free(MemoryRegionCache<?> cache) { if (cache == null) { return 0; } return cache.free(); } void trim() { trim(tinySubPageDirectCaches); trim(smallSubPageDirectCaches); trim(normalDirectCaches); trim(tinySubPageHeapCaches); trim(smallSubPageHeapCaches); trim(normalHeapCaches); } private static void trim(MemoryRegionCache<?>[] caches) { if (caches == null) { return; } for (MemoryRegionCache<?> c: caches) { trim(c); } } private static void trim(MemoryRegionCache<?> cache) { if (cache == null) { return; } cache.trim(); } private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) { int idx = PoolArena.smallIdx(normCapacity); if (area.isDirect()) { return cache(smallSubPageDirectCaches, idx); } return cache(smallSubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { if (area.isDirect()) { int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } int idx = log2(normCapacity >> numShiftsNormalHeap); return cache(normalHeapCaches, idx); } private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; }
Cache used for buffers which are backed by TINY or SMALL size.
/** * Cache used for buffers which are backed by TINY or SMALL size. */
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> { SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); } @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, handle, reqCapacity); } }
Cache used for buffers which are backed by NORMAL size.
/** * Cache used for buffers which are backed by NORMAL size. */
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> { NormalMemoryRegionCache(int size) { super(size, SizeClass.Normal); } @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBuf(buf, handle, reqCapacity); } } private abstract static class MemoryRegionCache<T> { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; }
Init the PooledByteBuf using the provided chunk and handle with the capacity restrictions.
/** * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions. */
protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
Add to cache if not already full.
/** * Add to cache if not already full. */
@SuppressWarnings("unchecked") public final boolean add(PoolChunk<T> chunk, long handle) { Entry<T> entry = newEntry(chunk, handle); boolean queued = queue.offer(entry); if (!queued) { // If it was not possible to cache the chunk, immediately recycle the entry entry.recycle(); } return queued; }
Allocate something out of the cache if possible and remove the entry from the cache.
/** * Allocate something out of the cache if possible and remove the entry from the cache. */
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { Entry<T> entry = queue.poll(); if (entry == null) { return false; } initBuf(entry.chunk, entry.handle, buf, reqCapacity); entry.recycle(); // allocations is not thread-safe which is fine as this is only called from the same thread all time. ++ allocations; return true; }
Clear out this cache and free up all previous cached PoolChunks and handles.
/** * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. */
public final int free() { return free(Integer.MAX_VALUE); } private int free(int max) { int numFreed = 0; for (; numFreed < max; numFreed++) { Entry<T> entry = queue.poll(); if (entry != null) { freeEntry(entry); } else { // all cleared return numFreed; } } return numFreed; }
Free up cached PoolChunks if not allocated frequently enough.
/** * Free up cached {@link PoolChunk}s if not allocated frequently enough. */
public final void trim() { int free = size - allocations; allocations = 0; // We not even allocated all the number that are if (free > 0) { free(free); } } @SuppressWarnings({ "unchecked", "rawtypes" }) private void freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; long handle = entry.handle; // recycle now so PoolChunk can be GC'ed. entry.recycle(); chunk.arena.freeChunk(chunk, handle, sizeClass); } static final class Entry<T> { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; long handle = -1; Entry(Handle<Entry<?>> recyclerHandle) { this.recyclerHandle = recyclerHandle; } void recycle() { chunk = null; handle = -1; recyclerHandle.recycle(this); } } @SuppressWarnings("rawtypes") private static Entry newEntry(PoolChunk<?> chunk, long handle) { Entry entry = RECYCLER.get(); entry.chunk = chunk; entry.handle = handle; return entry; } @SuppressWarnings("rawtypes") private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @SuppressWarnings("unchecked") @Override protected Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }; } }