/*
 * Copyright 2015 Terracotta, Inc., a Software AG company.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.terracotta.offheapstore;

import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.terracotta.offheapstore.buffersource.BufferSource;
import org.terracotta.offheapstore.exceptions.OversizeMappingException;
import org.terracotta.offheapstore.paging.Page;
import org.terracotta.offheapstore.paging.PageSource;
import org.terracotta.offheapstore.storage.BinaryStorageEngine;
import org.terracotta.offheapstore.storage.StorageEngine;
import org.terracotta.offheapstore.util.DebuggingUtils;
import org.terracotta.offheapstore.util.FindbugsSuppressWarnings;
import org.terracotta.offheapstore.util.NoOpLock;
import org.terracotta.offheapstore.util.WeakIdentityHashMap;

import static org.terracotta.offheapstore.MetadataTuple.metadataTuple;

A hash-table implementation whose table is stored in an NIO direct buffer.

The map stores keys and values encoded as integers, in an open-addressed linear-reprobing hashtable. Entries are 16-bytes wide, and consist of:

  • int status marker
  • int cached key hashcode
  • long {key:value} representation
Key and value representations are generated by the StorageEngine instance provided at construction time.

This Map implementation is not thread-safe and does not support null keys or values.

Author:Chris Dennis
Type parameters:
  • <K> – the type of keys maintained by this map
  • <V> – the type of mapped values
/** * A hash-table implementation whose table is stored in an NIO direct buffer. * <p> * The map stores keys and values encoded as integers, in an open-addressed * linear-reprobing hashtable. Entries are 16-bytes wide, and consist of: * <ul> * <li><code>int</code> status marker</li> * <li><code>int</code> cached key hashcode</li> * <li><code>long</code> {key:value} representation</li> * </ul> * Key and value representations are generated by the {@link StorageEngine} * instance provided at construction time. * <p> * This {@link Map} implementation is not thread-safe and does not support null * keys or values. * * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values * * @author Chris Dennis */
public class OffHeapHashMap<K, V> extends AbstractMap<K, V> implements MapInternals, StorageEngine.Owner, HashingMap<K, V> { /* * Future design ideas: * * We might want to look in to reading the whole (or large chunks) of the * probe sequence for a key in one shot rather than doing it one by one. */ private static final Logger LOGGER = LoggerFactory.getLogger(OffHeapHashMap.class); private static final int INITIAL_TABLE_SIZE = 128; private static final float TABLE_RESIZE_THRESHOLD = 0.5f; private static final float TABLE_SHRINK_THRESHOLD = 0.2f; private static final int INITIAL_REPROBE_LENGTH = 16; private static final int REPROBE_WARNING_THRESHOLD = 1024; private static final int ALLOCATE_ON_CLEAR_THRESHOLD_RATIO = 2; private static final IntBuffer DESTROYED_TABLE = IntBuffer.allocate(0);
Size of a table entry in primitive int units
/** * Size of a table entry in primitive {@code int} units */
protected static final int ENTRY_SIZE = 4; protected static final int ENTRY_BIT_SHIFT = Integer.numberOfTrailingZeros(ENTRY_SIZE); protected static final int STATUS = 0; protected static final int KEY_HASHCODE = 1; protected static final int ENCODING = 2; protected static final int STATUS_USED = 1; protected static final int STATUS_REMOVED = 2; public static final int RESERVED_STATUS_BITS = STATUS_USED | STATUS_REMOVED; protected final StorageEngine<? super K, ? super V> storageEngine; protected final PageSource tableSource; private final WeakIdentityHashMap<IntBuffer, PendingPage> pendingTableFrees = new WeakIdentityHashMap<>(pending -> freeTable(pending.tablePage)); private final int initialTableSize; private final boolean tableAllocationsSteal; private final ThreadLocal<Boolean> tableResizing = ThreadLocal.withInitial(() -> Boolean.FALSE); protected volatile int size; protected volatile int modCount; /* * The reprobe limit as it currently stands only ever increases. If we change * this behavior we will need to make changes to the iterators as they assume * this to be true. */ protected int reprobeLimit = INITIAL_REPROBE_LENGTH; private float currentTableShrinkThreshold = TABLE_SHRINK_THRESHOLD; private volatile boolean hasUsedIterators;
The current hash-table.

A list of: int[] {status, hashCode, encoding-high, encoding-low}

/** * The current hash-table. * <p> * A list of: {@code int[] {status, hashCode, encoding-high, encoding-low}} */
protected volatile IntBuffer hashtable; protected volatile Page hashTablePage; private Set<Entry<K, V>> entrySet; private Set<K> keySet; private Set<Long> encodingSet; /* * Statistics support */ protected volatile int removedSlots;
Construct an instance using a custom BufferSource for the hashtable.
Params:
  • source – source for the hashtable allocations
  • storageEngine – engine used to encode the keys and values
/** * Construct an instance using a custom {@link BufferSource} for the * hashtable. * * @param source source for the hashtable allocations * @param storageEngine engine used to encode the keys and values */
public OffHeapHashMap(PageSource source, StorageEngine<? super K, ? super V> storageEngine) { this(source, storageEngine, INITIAL_TABLE_SIZE); } public OffHeapHashMap(PageSource source, boolean tableAllocationsSteal, StorageEngine<? super K, ? super V> storageEngine) { this(source, tableAllocationsSteal, storageEngine, INITIAL_TABLE_SIZE); } public OffHeapHashMap(PageSource source, StorageEngine<? super K, ? super V> storageEngine, boolean bootstrap) { this(source, false, storageEngine, INITIAL_TABLE_SIZE, bootstrap); }
Construct an instance using a custom BufferSource for the hashtable and a custom initial table size.
Params:
  • source – source for the hashtable allocations
  • storageEngine – engine used to encode the keys and values
  • tableSize – the initial table size
/** * Construct an instance using a custom {@link BufferSource} for the * hashtable and a custom initial table size. * * @param source source for the hashtable allocations * @param storageEngine engine used to encode the keys and values * @param tableSize the initial table size */
public OffHeapHashMap(PageSource source, StorageEngine<? super K, ? super V> storageEngine, int tableSize) { this(source, false, storageEngine, tableSize, true); } public OffHeapHashMap(PageSource source, boolean tableAllocationsSteal, StorageEngine<? super K, ? super V> storageEngine, int tableSize) { this(source, tableAllocationsSteal, storageEngine, tableSize, true); } @FindbugsSuppressWarnings("ICAST_INTEGER_MULTIPLY_CAST_TO_LONG") protected OffHeapHashMap(PageSource source, boolean tableAllocationsSteal, StorageEngine<? super K, ? super V> storageEngine, int tableSize, boolean bootstrap) { if (storageEngine == null) { throw new NullPointerException("StorageEngine implementation must be non-null"); } this.storageEngine = storageEngine; this.tableSource = source; this.tableAllocationsSteal = tableAllocationsSteal; // Find a power of 2 >= initialCapacity int capacity = 1; while (capacity < tableSize) { capacity <<= 1; } this.initialTableSize = capacity; if (bootstrap) { this.hashTablePage = allocateTable(initialTableSize); if (hashTablePage == null) { String msg = "Initial table allocation failed.\n" + "Initial Table Size (slots) : " + initialTableSize + '\n' + "Allocation Will Require : " + DebuggingUtils.toBase2SuffixedString(initialTableSize * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE)) + "B\n" + "Table Page Source : " + tableSource; throw new IllegalArgumentException(msg); } hashtable = hashTablePage.asIntBuffer(); } this.storageEngine.bind(this); } @Override public int size() { return size; } @Override public boolean containsKey(Object key) { int hash = key.hashCode(); if (size == 0) { return false; } IntBuffer view = (IntBuffer) hashtable.duplicate().position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!view.hasRemaining()) { view.rewind(); } IntBuffer entry = (IntBuffer) view.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return false; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { hit(entry); return true; } else { view.position(view.position() + ENTRY_SIZE); } } return false; } @SuppressWarnings("unchecked") @Override public V get(Object key) { int hash = key.hashCode(); if (size == 0) { return null; } IntBuffer view = (IntBuffer) hashtable.duplicate().position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!view.hasRemaining()) { view.rewind(); } IntBuffer entry = (IntBuffer) view.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { hit(entry); return (V) storageEngine.readValue(readLong(entry, ENCODING)); } else { view.position(view.position() + ENTRY_SIZE); } } return null; } @Override public Long getEncodingForHashAndBinary(int hash, ByteBuffer binaryKey) { if (size == 0) { return null; } IntBuffer view = (IntBuffer) hashtable.duplicate().position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!view.hasRemaining()) { view.rewind(); } IntBuffer entry = (IntBuffer) view.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && binaryKeyEquals(binaryKey, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { return readLong(entry, ENCODING); } else { view.position(view.position() + ENTRY_SIZE); } } return null; } @Override @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public long installMappingForHashAndEncoding(int pojoHash, ByteBuffer offheapBinaryKey, ByteBuffer offheapBinaryValue, int metadata) { freePendingTables(); int[] newEntry = installEntry(offheapBinaryKey, pojoHash, offheapBinaryValue, metadata); int start = indexFor(spread(pojoHash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { if (isRemoved(entry)) { removedSlots--; } entry.put(newEntry); slotAdded(entry); hit(entry); return readLong(newEntry, ENCODING); } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); //XXX: further contemplate the boolean value here // hit reprobe limit - must rehash expand(start, limit); return installMappingForHashAndEncoding(pojoHash, offheapBinaryKey, offheapBinaryValue, metadata); } public Integer getMetadata(Object key, int mask) { int safeMask = mask & ~RESERVED_STATUS_BITS; freePendingTables(); int hash = key.hashCode(); hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); long encoding = readLong(entry, ENCODING); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, encoding, entry.get(KEY_HASHCODE))) { return entry.get(STATUS) & safeMask; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return null; } public Integer getAndSetMetadata(Object key, int mask, int values) { int safeMask = mask & ~RESERVED_STATUS_BITS; freePendingTables(); int hash = key.hashCode(); hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); long encoding = readLong(entry, ENCODING); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, encoding, entry.get(KEY_HASHCODE))) { int previous = entry.get(STATUS); entry.put(STATUS, (previous & ~safeMask) | (values & safeMask)); return previous & safeMask; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return null; } public V getValueAndSetMetadata(Object key, int mask, int values) { int safeMask = mask & ~RESERVED_STATUS_BITS; freePendingTables(); int hash = key.hashCode(); hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); long encoding = readLong(entry, ENCODING); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, encoding, entry.get(KEY_HASHCODE))) { hit(entry); entry.put(STATUS, (entry.get(STATUS) & ~safeMask) | (values & safeMask)); @SuppressWarnings("unchecked") V result = (V) storageEngine.readValue(readLong(entry, ENCODING)); return result; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return null; } @Override public V put(K key, V value) { return put(key, value, 0); } @SuppressWarnings("unchecked") @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public V put(K key, V value, int metadata) { freePendingTables(); int hash = key.hashCode(); int[] newEntry = writeEntry(key, hash, value, metadata); int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, metadata); storageEngine.invalidateCache(); for (IntBuffer laterEntry = entry; i < limit; i++) { if (isTerminating(laterEntry)) { break; } else if (isPresent(laterEntry) && keyEquals(key, hash, readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE))) { V old = (V) storageEngine.readValue(readLong(laterEntry, ENCODING)); storageEngine.freeMapping(readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE), false); long oldEncoding = readLong(laterEntry, ENCODING); laterEntry.put(newEntry); slotUpdated((IntBuffer) laterEntry.flip(), oldEncoding); hit(laterEntry); return old; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } if (!hashtable.hasRemaining()) { hashtable.rewind(); } laterEntry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); } if (isRemoved(entry)) { removedSlots--; } entry.put(newEntry); slotAdded(entry); hit(entry); return null; } else if (keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, metadata); storageEngine.invalidateCache(); V old = (V) storageEngine.readValue(readLong(entry, ENCODING)); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), false); long oldEncoding = readLong(entry, ENCODING); entry.put(newEntry); slotUpdated((IntBuffer) entry.flip(), oldEncoding); hit(entry); return old; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); //XXX: further contemplate the boolean value here // hit reprobe limit - must rehash expand(start, limit); return put(key, value, metadata); }
Associates the specified value with the specified key in this map. If the map does not contain a mapping for the key, the new mapping is only installed if there is room. If the map previously contained a mapping for the key, the old value is replaced by the specified value even if this results in a failure or eviction.
Params:
  • key – key with which the specified value is to be associated
  • value – value to be associated with the specified key
Returns:the previous value associated with key, or null if there was no mapping for key (irrespective of whether the value was successfully installed).
/** * Associates the specified value with the specified key in this map. If the * map does not contain a mapping for the key, the new mapping is only * installed if there is room. If the map previously contained a mapping for * the key, the old value is replaced by the specified value even if this * results in a failure or eviction. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with <tt>key</tt>, or * <tt>null</tt> if there was no mapping for <tt>key</tt> * (irrespective of whether the value was successfully installed). */
public V fill(K key, V value) { return fill(key, value, 0); } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public V fill(K key, V value, int metadata) { freePendingTables(); int hash = key.hashCode(); int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { for (IntBuffer laterEntry = entry; i < limit; i++) { if (isTerminating(laterEntry)) { break; } else if (isPresent(laterEntry) && keyEquals(key, hash, readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE))) { return put(key, value, metadata); } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } if (!hashtable.hasRemaining()) { hashtable.rewind(); } laterEntry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); } int[] newEntry = tryWriteEntry(key, hash, value, metadata); if (newEntry == null) { return null; } else { return fill(key, value, hash, newEntry, metadata); } } else if (keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { return put(key, value, metadata); } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } // hit reprobe limit - must rehash if (tryExpandTable()) { return fill(key, value, metadata); } else { return null; } } @SuppressWarnings("unchecked") @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") protected final V fill(K key, V value, int hash, int[] newEntry, int metadata) { freePendingTables(); int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, metadata); storageEngine.invalidateCache(); for (IntBuffer laterEntry = entry; i < limit; i++) { if (isTerminating(laterEntry)) { break; } else if (isPresent(laterEntry) && keyEquals(key, hash, readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE))) { V old = (V) storageEngine.readValue(readLong(laterEntry, ENCODING)); storageEngine.freeMapping(readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE), false); long oldEncoding = readLong(laterEntry, ENCODING); laterEntry.put(newEntry); slotUpdated((IntBuffer) laterEntry.flip(), oldEncoding); hit(laterEntry); return old; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } if (!hashtable.hasRemaining()) { hashtable.rewind(); } laterEntry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); } if (isRemoved(entry)) { removedSlots--; } entry.put(newEntry); slotAdded(entry); hit(entry); return null; } else if (keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, metadata); storageEngine.invalidateCache(); V old = (V) storageEngine.readValue(readLong(entry, ENCODING)); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), false); long oldEncoding = readLong(entry, ENCODING); entry.put(newEntry); slotUpdated((IntBuffer) entry.flip(), oldEncoding); hit(entry); return old; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], true); return null; } private int[] writeEntry(K key, int hash, V value, int metadata) { /* * TODO If we start supporting remapping (compaction) then we'll need to * worry about correcting the key representation here if the value write * triggers a remapping operation. */ while (true) { int[] entry = tryWriteEntry(key, hash, value, metadata); if (entry == null) { storageEngineFailure(key); } else { return entry; } } } @FindbugsSuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") private int[] tryWriteEntry(K key, int hash, V value, int metadata) { if (hashtable == null) { throw new NullPointerException(); } else if (hashtable == DESTROYED_TABLE) { throw new IllegalStateException("Offheap map/cache has been destroyed"); } else if ((metadata & RESERVED_STATUS_BITS) == 0) { Long encoding = storageEngine.writeMapping(key, value, hash, metadata); if (encoding == null) { return null; } else { return createEntry(hash, encoding, metadata); } } else { throw new IllegalArgumentException("Invalid metadata for key '" + key + "' : " + Integer.toBinaryString(metadata)); } } private int[] installEntry(ByteBuffer offheapBinaryKey, int pojoHash, ByteBuffer offheapBinaryValue, int metadata) { while (true) { int [] entry = tryInstallEntry(offheapBinaryKey, pojoHash, offheapBinaryValue, metadata); if (entry == null) { storageEngineFailure("<binary-key>"); } else { return entry; } } } @FindbugsSuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS") private int[] tryInstallEntry(ByteBuffer offheapBinaryKey, int pojoHash, ByteBuffer offheapBinaryValue, int metadata) { if (hashtable == null) { throw new NullPointerException(); } else if (hashtable == DESTROYED_TABLE) { throw new IllegalStateException("Offheap map/cache has been destroyed"); } else if ((metadata & RESERVED_STATUS_BITS) == 0) { Long encoding = ((BinaryStorageEngine) storageEngine).writeBinaryMapping(offheapBinaryKey, offheapBinaryValue, pojoHash, metadata); if (encoding == null) { return null; } else { return createEntry(pojoHash, encoding, metadata); } } else { throw new IllegalArgumentException("Invalid metadata for binary key : " + Integer.toBinaryString(metadata)); } } private static int[] createEntry(int hash, long encoding, int metadata) { return new int[] { STATUS_USED | metadata, hash, (int) (encoding >>> Integer.SIZE), (int) encoding }; } @Override public V remove(Object key) { freePendingTables(); int hash = key.hashCode(); if (size == 0) { return null; } hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { @SuppressWarnings("unchecked") V removedValue = (V) storageEngine.readValue(readLong(entry, ENCODING)); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), true); /* * TODO We might want to track the number of 'removed' slots in the * table, and rehash it if we reach some threshold to avoid lookup costs * staying artificially high when the table is relatively empty, but full * of 'removed' slots. This situation should be relatively rare for * normal cache usage - but might be more common for more map like usage * patterns. * * The more severe versions of this pattern are now handled by the table * shrinking when the occupation drops below the shrink threshold, as * that will rehash the table. */ entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); shrink(); return removedValue; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return null; } @Override public Map<K, V> removeAllWithHash(int hash) { freePendingTables(); if (size == 0) { return Collections.emptyMap(); } Map<K, V> removed = new HashMap<>(); hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return removed; } else if (isPresent(entry) && hash == entry.get(KEY_HASHCODE)) { @SuppressWarnings("unchecked") V removedValue = (V) storageEngine.readValue(readLong(entry, ENCODING)); @SuppressWarnings("unchecked") K removedKey = (K) storageEngine.readKey(readLong(entry, ENCODING), hash); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), true); removed.put(removedKey, removedValue); /* * TODO We might want to track the number of 'removed' slots in the * table, and rehash it if we reach some threshold to avoid lookup costs * staying artificially high when the table is relatively empty, but full * of 'removed' slots. This situation should be relatively rare for * normal cache usage - but might be more common for more map like usage * patterns. * * The more severe versions of this pattern are now handled by the table * shrinking when the occupation drops below the shrink threshold, as * that will rehash the table. */ entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); } hashtable.position(hashtable.position() + ENTRY_SIZE); } shrink(); return removed; } public boolean removeNoReturn(Object key) { freePendingTables(); int hash = key.hashCode(); hashtable.position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return false; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), true); /* * TODO We might want to track the number of 'removed' slots in the * table, and rehash it if we reach some threshold to avoid lookup costs * staying artificially high when the table is relatively empty, but full * of 'removed' slots. This situation should be relatively rare for * normal cache usage - but might be more common for more map like usage * patterns. * * The more severe versions of this pattern are now handled by the table * shrinking when the occupation drops below the shrink threshold, as * that will rehash the table. */ entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); shrink(); return true; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return false; } @Override @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public void clear() { if (hashtable != DESTROYED_TABLE) { freePendingTables(); modCount++; removedSlots = 0; size = 0; storageEngine.clear(); allocateOrClearTable(initialTableSize); } } public void destroy() { removedSlots = 0; size = 0; freeTable(hashTablePage); for (Iterator<PendingPage> it = pendingTableFrees.values(); it.hasNext(); freeTable(it.next().tablePage)); hashTablePage = null; hashtable = DESTROYED_TABLE; storageEngine.destroy(); } private void allocateOrClearTable(int size) { int[] zeros = new int[1024 >> 2]; hashtable.clear(); while (hashtable.hasRemaining()) { if (hashtable.remaining() < zeros.length) { hashtable.put(zeros, 0, hashtable.remaining()); } else { hashtable.put(zeros); } } hashtable.clear(); wipePendingTables(); if (hashtable.capacity() > size * ENTRY_SIZE * ALLOCATE_ON_CLEAR_THRESHOLD_RATIO) { Page newTablePage = allocateTable(size); if (newTablePage != null) { freeTable(hashTablePage, hashtable, reprobeLimit()); hashTablePage = newTablePage; hashtable = newTablePage.asIntBuffer(); } } } @Override public final Set<Entry<K, V>> entrySet() { Set<Entry<K, V>> es = entrySet; return es == null ? (entrySet = createEntrySet()) : es; } @Override public final Set<Long> encodingSet() { Set<Long> es = encodingSet; return es == null ? (encodingSet = createEncodingSet()) : es; } @Override public final Set<K> keySet() { Set<K> ks = keySet; return ks == null ? (keySet = createKeySet()) : ks; } protected Set<Entry<K, V>> createEntrySet() { return new EntrySet(); } protected Set<Long> createEncodingSet() { return new EncodingSet(); } protected Set<K> createKeySet() { return new KeySet(); } protected static boolean isPresent(IntBuffer entry) { return (entry.get(STATUS) & STATUS_USED) != 0; } protected static boolean isAvailable(IntBuffer entry) { return (entry.get(STATUS) & STATUS_USED) == 0; } protected static boolean isTerminating(IntBuffer entry) { return isTerminating(entry.get(STATUS)); } protected static boolean isTerminating(int entryStatus) { return (entryStatus & (STATUS_USED | STATUS_REMOVED)) == 0; } protected static boolean isRemoved(IntBuffer entry) { return isRemoved(entry.get(STATUS)); } protected static boolean isRemoved(int entryStatus) { return (entryStatus & STATUS_REMOVED) != 0; } protected static long readLong(int[] array, int offset) { return (((long) array[offset]) << Integer.SIZE) | (0xffffffffL & array[offset + 1]); } protected static long readLong(IntBuffer entry, int offset) { return (((long) entry.get(offset)) << Integer.SIZE) | (0xffffffffL & entry.get(offset + 1)); } protected int indexFor(int hash) { return indexFor(hash, hashtable); } protected static int indexFor(int hash, IntBuffer table) { return (hash << ENTRY_BIT_SHIFT) & Math.max(0, table.capacity() - 1); } private boolean keyEquals(Object probeKey, int probeHash, long targetEncoding, int targetHash) { return probeHash == targetHash && storageEngine.equalsKey(probeKey, targetEncoding); } private boolean binaryKeyEquals(ByteBuffer binaryProbeKey, int probeHash, long targetEncoding, int targetHash) { if (storageEngine instanceof BinaryStorageEngine) { return probeHash == targetHash && ((BinaryStorageEngine) storageEngine).equalsBinaryKey(binaryProbeKey, targetEncoding); } else { throw new UnsupportedOperationException("Cannot check binary quality unless configured with a BinaryStorageEngine"); } } private void expand(int start, int length) { if (!tryExpand()) { tableExpansionFailure(start, length); } } private boolean tryExpand() { if (((float) size) / getTableCapacity() > TABLE_RESIZE_THRESHOLD) { return tryExpandTable(); } else { return tryIncreaseReprobe(); } } private boolean tryExpandTable() { if (tableResizing.get()) { throw new AssertionError("Expand requested in context of an existing resize - this should be impossible"); } else { tableResizing.set(Boolean.TRUE); try { Page newTablePage = expandTable(1); if (newTablePage == null) { return false; } else { freeTable(hashTablePage, hashtable, reprobeLimit()); hashTablePage = newTablePage; hashtable = newTablePage.asIntBuffer(); removedSlots = 0; return true; } } finally { tableResizing.remove(); } } } private Page expandTable(int scale) { if (hashtable == DESTROYED_TABLE) { throw new IllegalStateException("This map/cache has been destroyed"); } /* Increase the size of the table to accommodate more entries */ int newsize = hashtable.capacity() << scale; /* Check we're not hitting max capacity */ if (newsize <= 0) { return null; } long startTime = -1; if (LOGGER.isDebugEnabled()) { startTime = System.nanoTime(); int slots = hashtable.capacity() / ENTRY_SIZE; int newslots = newsize / ENTRY_SIZE; LOGGER.debug("Expanding table from {} slots to {} slots [load-factor={}]", DebuggingUtils.toBase2SuffixedString(slots), DebuggingUtils.toBase2SuffixedString(newslots), ((float) size) / slots); } Page newTablePage = allocateTable(newsize / ENTRY_SIZE); if (newTablePage == null) { return null; } IntBuffer newTable = newTablePage.asIntBuffer(); for (hashtable.clear(); hashtable.hasRemaining(); hashtable.position(hashtable.position() + ENTRY_SIZE)) { IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isPresent(entry) && !writeEntry(newTable, entry)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Table expansion from {} slots to {} slots abandoned - not enough table space", DebuggingUtils.toBase2SuffixedString(hashtable.capacity() / ENTRY_SIZE), DebuggingUtils.toBase2SuffixedString(newsize / ENTRY_SIZE)); } freeTable(newTablePage); return expandTable(scale + 1); } } if (LOGGER.isDebugEnabled()) { long time = System.nanoTime() - startTime; LOGGER.debug("Table expansion from {} slots to {} slots complete : took {}ms", DebuggingUtils.toBase2SuffixedString(hashtable.capacity() / ENTRY_SIZE), DebuggingUtils.toBase2SuffixedString(newsize / ENTRY_SIZE), ((float) time) / 1000000); } return newTablePage; } protected boolean tryIncreaseReprobe() { if (reprobeLimit() >= getTableCapacity()) { return false; } else { int newReprobeLimit = reprobeLimit() << 1; if (newReprobeLimit >= REPROBE_WARNING_THRESHOLD) { long slots = getTableCapacity(); LOGGER.warn("Expanding reprobe sequence from {} slots to {} slots [load-factor={}]", reprobeLimit(), newReprobeLimit, ((float) size) / slots); } else if (LOGGER.isDebugEnabled()) { long slots = getTableCapacity(); LOGGER.debug("Expanding reprobe sequence from {} slots to {} slots [load-factor={}]", reprobeLimit(), newReprobeLimit, ((float) size) / slots); } reprobeLimit = newReprobeLimit; return true; } } protected void shrinkTable() { shrink(); } private void shrink() { if (((float) size) / getTableCapacity() <= currentTableShrinkThreshold) { shrinkTableImpl(); } } private void shrinkTableImpl() { if (tableResizing.get()) { LOGGER.debug("Shrink request ignored in the context of an in-process expand - likely self stealing"); } else { tableResizing.set(Boolean.TRUE); try { float shrinkRatio = (TABLE_RESIZE_THRESHOLD * getTableCapacity()) / size; int shrinkShift = Integer.numberOfTrailingZeros(Integer.highestOneBit(Math.max(2, (int) shrinkRatio))); Page newTablePage = shrinkTableImpl(shrinkShift); if (newTablePage == null) { currentTableShrinkThreshold = currentTableShrinkThreshold / 2; } else { currentTableShrinkThreshold = TABLE_SHRINK_THRESHOLD; freeTable(hashTablePage, hashtable, reprobeLimit()); hashTablePage = newTablePage; hashtable = newTablePage.asIntBuffer(); removedSlots = 0; } } finally { tableResizing.remove(); } } } private Page shrinkTableImpl(int scale) { /* Increase the size of the table to accommodate more entries */ int newsize = hashtable.capacity() >>> scale; /* Check we're not hitting zero capacity */ if (newsize < ENTRY_SIZE) { if (scale > 1) { return shrinkTableImpl(scale - 1); } else { return null; } } long startTime = -1; if (LOGGER.isDebugEnabled()) { startTime = System.nanoTime(); int slots = hashtable.capacity() / ENTRY_SIZE; int newslots = newsize / ENTRY_SIZE; LOGGER.debug("Shrinking table from {} slots to {} slots [load-factor={}]", DebuggingUtils.toBase2SuffixedString(slots), DebuggingUtils.toBase2SuffixedString(newslots), ((float) size) / slots); } Page newTablePage = allocateTable(newsize / ENTRY_SIZE); if (newTablePage == null) { return null; } IntBuffer newTable = newTablePage.asIntBuffer(); for (hashtable.clear(); hashtable.hasRemaining(); hashtable.position(hashtable.position() + ENTRY_SIZE)) { IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isPresent(entry) && !writeEntry(newTable, entry)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Table shrinking from {} slots to {} slots abandoned - too little table space", DebuggingUtils.toBase2SuffixedString(hashtable.capacity() / ENTRY_SIZE), DebuggingUtils.toBase2SuffixedString(newsize / ENTRY_SIZE)); } freeTable(newTablePage); if (scale > 1) { return shrinkTableImpl(scale - 1); } else { hashtable.clear(); return null; } } } if (LOGGER.isDebugEnabled()) { long time = System.nanoTime() - startTime; LOGGER.debug("Table shrinking from {} slots to {} slots complete : took {}ms", DebuggingUtils.toBase2SuffixedString(hashtable.capacity() / ENTRY_SIZE), DebuggingUtils.toBase2SuffixedString(newsize / ENTRY_SIZE), ((float) time) / 1000000); } return newTablePage; } private boolean writeEntry(IntBuffer table, IntBuffer entry) { int start = indexFor(spread(entry.get(KEY_HASHCODE)), table); int tableMask = table.capacity() - 1; for (int i = 0; i < reprobeLimit() * ENTRY_SIZE; i += ENTRY_SIZE) { int address = (start + i) & tableMask; int existingStatus = table.get(address + STATUS); if (isTerminating(existingStatus)) { table.position(address); table.put(entry); return true; } else if (isRemoved(existingStatus)) { throw new AssertionError(); } } return false; } protected static int spread(int hash) { int h = hash; h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } private Page allocateTable(int size) { Page newTablePage = tableSource.allocate(size * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE), tableAllocationsSteal, false, null); if (newTablePage != null) { ByteBuffer buffer = newTablePage.asByteBuffer(); byte[] zeros = new byte[1024]; buffer.clear(); while (buffer.hasRemaining()) { if (buffer.remaining() < zeros.length) { buffer.put(zeros, 0, buffer.remaining()); } else { buffer.put(zeros); } } buffer.clear(); } return newTablePage; } private void freeTable(Page tablePage, IntBuffer table, int finalReprobe) { if (hasUsedIterators) { pendingTableFrees.put(table, new PendingPage(tablePage, finalReprobe)); } else { freeTable(tablePage); } } private void freeTable(Page tablePage) { tableSource.free(tablePage); } private int reprobeLimit() { return reprobeLimit; } protected class EntrySet extends AbstractSet<Entry<K, V>> { @Override public Iterator<Map.Entry<K, V>> iterator() { return new EntryIterator(); } @Override public boolean contains(Object o) { if (!(o instanceof Entry<?, ?>)) { return false; } Entry<?, ?> e = (Entry<?, ?>) o; V value = get(e.getKey()); return value != null && value.equals(e.getValue()); } @Override public boolean remove(Object o) { return removeMapping(o); } @Override public int size() { return size; } @Override public void clear() { OffHeapHashMap.this.clear(); } } protected class EncodingSet extends AbstractSet<Long> { @Override public Iterator<Long> iterator() { return new EncodingIterator(); } @Override public int size() { return size; } @Override public boolean contains(Object o) { // We could allow the impl from AbstractSet to run but that won't perform as well as you'd expect Set.contains() to run throw new UnsupportedOperationException(); } } protected class KeySet extends AbstractSet<K> { @Override public Iterator<K> iterator() { return new KeyIterator(); } @Override public boolean contains(Object o) { return OffHeapHashMap.this.containsKey(o); } @Override public boolean remove(Object o) { return OffHeapHashMap.this.remove(o) != null; } @Override public int size() { return OffHeapHashMap.this.size(); } @Override public void clear() { OffHeapHashMap.this.clear(); } } protected abstract class HashIterator<T> implements Iterator<T> { final int expectedModCount; // For fast-fail /* * We *must* keep a reference to the original table object that is the key * in the weak map to prevent the table from being freed */ final IntBuffer table; final IntBuffer tableView; T next = null; // next entry to return HashIterator() { hasUsedIterators = true; table = hashtable; tableView = (IntBuffer) table.asReadOnlyBuffer().clear(); expectedModCount = modCount; if (size > 0) { // advance to first entry while (tableView.hasRemaining()) { IntBuffer entry = (IntBuffer) tableView.slice().limit(ENTRY_SIZE); tableView.position(tableView.position() + ENTRY_SIZE); if (isPresent(entry)) { next = create(entry); break; } } } } protected abstract T create(IntBuffer entry); @Override public boolean hasNext() { return next != null; } @Override public T next() { checkForConcurrentModification(); T e = next; if (e == null) { throw new NoSuchElementException(); } next = null; while (tableView.hasRemaining()) { IntBuffer entry = (IntBuffer) tableView.slice().limit(ENTRY_SIZE); tableView.position(tableView.position() + ENTRY_SIZE); if (isPresent(entry)) { next = create(entry); break; } } return e; } @Override public void remove() { throw new UnsupportedOperationException(); } protected void checkForConcurrentModification() { if (modCount != expectedModCount) { throw new ConcurrentModificationException(); } } } static class PendingPage { final Page tablePage; final int reprobe; PendingPage(Page tablePage, int reprobe) { this.tablePage = tablePage; this.reprobe = reprobe; } } protected void freePendingTables() { if (hasUsedIterators) { pendingTableFrees.reap(); } } private void updatePendingTables(int hash, long oldEncoding, IntBuffer newEntry) { if (hasUsedIterators) { pendingTableFrees.reap(); Iterator<PendingPage> it = pendingTableFrees.values(); while (it.hasNext()) { PendingPage pending = it.next(); IntBuffer pendingTable = pending.tablePage.asIntBuffer(); pendingTable.position(indexFor(spread(hash), pendingTable)); for (int i = 0; i < pending.reprobe; i++) { if (!pendingTable.hasRemaining()) { pendingTable.rewind(); } IntBuffer entry = (IntBuffer) pendingTable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { break; } else if (isPresent(entry) && (hash == entry.get(KEY_HASHCODE)) && (oldEncoding == readLong(entry, ENCODING))) { entry.put(newEntry.duplicate()); break; } else { pendingTable.position(pendingTable.position() + ENTRY_SIZE); } } } } } private void wipePendingTables() { if (hasUsedIterators) { pendingTableFrees.reap(); int[] zeros = new int[1024 >> 2]; Iterator<PendingPage> it = pendingTableFrees.values(); while (it.hasNext()) { PendingPage pending = it.next(); IntBuffer pendingTable = pending.tablePage.asIntBuffer(); pendingTable.clear(); while (pendingTable.hasRemaining()) { if (pendingTable.remaining() < zeros.length) { pendingTable.put(zeros, 0, pendingTable.remaining()); } else { pendingTable.put(zeros); } } pendingTable.clear(); } } } protected class KeyIterator extends HashIterator<K> { KeyIterator() { super(); } @Override @SuppressWarnings("unchecked") protected K create(IntBuffer entry) { return (K) storageEngine.readKey(readLong(entry, ENCODING), entry.get(KEY_HASHCODE)); } } protected class EntryIterator extends HashIterator<Entry<K, V>> { EntryIterator() { super(); } @Override protected Entry<K, V> create(IntBuffer entry) { return new DirectEntry(entry); } } protected class EncodingIterator extends HashIterator<Long> { EncodingIterator() { super(); } @Override protected Long create(IntBuffer entry) { return readLong(entry, ENCODING); } } class DirectEntry implements Entry<K, V> { private final K key; private final V value; @SuppressWarnings("unchecked") DirectEntry(IntBuffer entry) { this.key = (K) storageEngine.readKey(readLong(entry, ENCODING), entry.get(KEY_HASHCODE)); this.value = (V) storageEngine.readValue(readLong(entry, ENCODING)); } @Override public K getKey() { return key; } @Override public V getValue() { return value; } @Override public V setValue(V value) { throw new UnsupportedOperationException(); } @Override public int hashCode() { return key.hashCode() ^ value.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof Entry<?, ?>) { Entry<?, ?> e = (Entry<?, ?>) o; return key.equals(e.getKey()) && value.equals(e.getValue()); } else { return false; } } @Override public String toString() { return key + "=" + value; } } /* * remove used by EntrySet */ @SuppressWarnings("unchecked") protected boolean removeMapping(Object o) { freePendingTables(); if (!(o instanceof Entry<?, ?>)) { return false; } Entry<K, V> e = (Entry<K, V>) o; Object key = e.getKey(); int hash = key.hashCode(); hashtable.position(indexFor(spread(hash))); for (int i = 0; i < reprobeLimit(); i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return false; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE)) && storageEngine.equalsValue(e.getValue(), readLong(entry, ENCODING))) { storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), true); entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); shrink(); return true; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return false; } @Override public boolean evict(int index, boolean shrink) { return false; } protected void removeAtTableOffset(int offset, boolean shrink) { IntBuffer entry = ((IntBuffer) hashtable.duplicate().position(offset).limit(offset + ENTRY_SIZE)).slice(); if (isPresent(entry)) { storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), true); entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); if (shrink) { shrink(); } } else { throw new AssertionError(); } } @SuppressWarnings("unchecked") protected V getAtTableOffset(int offset) { IntBuffer entry = ((IntBuffer) hashtable.duplicate().position(offset).limit(offset + ENTRY_SIZE)).slice(); if (isPresent(entry)) { return (V) storageEngine.readValue(readLong(entry, ENCODING)); } else { throw new AssertionError(); } } protected Entry<K, V> getEntryAtTableOffset(int offset) { IntBuffer entry = ((IntBuffer) hashtable.duplicate().position(offset).limit(offset + ENTRY_SIZE)).slice(); if (isPresent(entry)) { return new DirectEntry(entry); } else { throw new AssertionError(); } } @Override public Integer getSlotForHashAndEncoding(int hash, long encoding, long mask) { IntBuffer view = (IntBuffer) hashtable.duplicate().position(indexFor(spread(hash))); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!view.hasRemaining()) { view.rewind(); } IntBuffer entry = (IntBuffer) view.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && (hash == entry.get(KEY_HASHCODE)) && ((encoding & mask) == (readLong(entry, ENCODING) & mask))) { return view.position(); } else { view.position(view.position() + ENTRY_SIZE); } } return null; } @Override public boolean updateEncoding(int hash, long oldEncoding, long newEncoding, long mask) { boolean updated = updateEncodingInTable(hashtable, reprobeLimit(), hash, oldEncoding, newEncoding, mask); if (hasUsedIterators) { pendingTableFrees.reap(); Iterator<PendingPage> it = pendingTableFrees.values(); while (it.hasNext()) { PendingPage pending = it.next(); updated |= updateEncodingInTable(pending.tablePage.asIntBuffer(), pending.reprobe, hash, oldEncoding, newEncoding, mask); } } return updated; } private static boolean updateEncodingInTable(IntBuffer table, int limit, int hash, long oldEncoding, long newEncoding, long mask) { table.position(indexFor(spread(hash), table)); for (int i = 0; i < limit; i++) { if (!table.hasRemaining()) { table.rewind(); } IntBuffer entry = (IntBuffer) table.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return false; } else if (isPresent(entry) && (hash == entry.get(KEY_HASHCODE)) && ((oldEncoding & mask) == (readLong(entry, ENCODING) & mask))) { entry.put(createEntry(hash, (readLong(entry, ENCODING) & ~mask) | newEncoding & mask, entry.get(STATUS))); return true; } else { table.position(table.position() + ENTRY_SIZE); } } return false; } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") private void slotRemoved(IntBuffer entry) { modCount++; removedSlots++; size--; updatePendingTables(entry.get(KEY_HASHCODE), readLong(entry, ENCODING), entry); removed(entry); } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") private void slotAdded(IntBuffer entry) { modCount++; size++; added(entry); } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") private void slotUpdated(IntBuffer entry, long oldEncoding) { modCount++; updatePendingTables(entry.get(KEY_HASHCODE), oldEncoding, entry); updated(entry); } protected void added(IntBuffer entry) { //no-op } protected void hit(IntBuffer entry) { //no-op } protected void removed(IntBuffer entry) { //no-op } protected void updated(IntBuffer entry) { //no-op } protected void tableExpansionFailure(int start, int length) { String msg = "Failed to expand table.\n" + "Current Table Size (slots) : " + getTableCapacity() + '\n' + "Resize Will Require : " + DebuggingUtils.toBase2SuffixedString(getTableCapacity() * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE) * 2) + "B\n" + "Table Buffer Source : " + tableSource; throw new OversizeMappingException(msg); } protected void storageEngineFailure(Object failure) { String msg = "Storage engine failed to store: " + failure + '\n' + "StorageEngine: " + storageEngine; throw new OversizeMappingException(msg); } /* MapStatistics Methods */ @Override public long getSize() { return size; } @Override public long getTableCapacity() { IntBuffer table = hashtable; return table == null ? 0 : table.capacity() / ENTRY_SIZE; } @Override public long getUsedSlotCount() { return getSize(); } @Override public long getRemovedSlotCount() { return removedSlots; } @Override public int getReprobeLength() { return reprobeLimit(); } @Override public long getAllocatedMemory() { return getDataAllocatedMemory() + (getTableCapacity() * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE)); } @Override public long getOccupiedMemory() { return getDataOccupiedMemory() + (getUsedSlotCount() * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE)); } @Override public long getVitalMemory() { return getDataVitalMemory() + (getTableCapacity() * ENTRY_SIZE * (Integer.SIZE / Byte.SIZE)); } @Override public long getDataAllocatedMemory() { return storageEngine.getAllocatedMemory(); } @Override public long getDataOccupiedMemory() { return storageEngine.getOccupiedMemory(); } @Override public long getDataVitalMemory() { return storageEngine.getVitalMemory(); } @Override public long getDataSize() { return storageEngine.getDataSize(); } @Override public boolean isThiefForTableAllocations() { return tableAllocationsSteal; } @Override public Lock readLock() { return NoOpLock.INSTANCE; } @Override public Lock writeLock() { return NoOpLock.INSTANCE; } public StorageEngine<? super K, ? super V> getStorageEngine() { return storageEngine; } /* * JDK-8-alike metadata methods */ @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public MetadataTuple<V> computeWithMetadata(K key, BiFunction<? super K, ? super MetadataTuple<V>, ? extends MetadataTuple<V>> remappingFunction) { freePendingTables(); int hash = key.hashCode(); IntBuffer originalTable = hashtable; int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { for (IntBuffer laterEntry = entry; i < limit; i++) { if (isTerminating(laterEntry)) { break; } else if (isPresent(laterEntry) && keyEquals(key, hash, readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE))) { long encoding = readLong(laterEntry, ENCODING); @SuppressWarnings("unchecked") MetadataTuple<V> existingValue = metadataTuple( (V) storageEngine.readValue(encoding), laterEntry.get(STATUS) & ~RESERVED_STATUS_BITS); MetadataTuple<V> result = remappingFunction.apply(key, existingValue); if (result == null) { storageEngine.freeMapping(readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE), true); laterEntry.put(STATUS, STATUS_REMOVED); slotRemoved(laterEntry); shrink(); } else if (result == existingValue) { //nop } else if (result.value() == existingValue.value()) { int previous = laterEntry.get(STATUS); laterEntry.put(STATUS, (previous & RESERVED_STATUS_BITS) | (result.metadata() & ~RESERVED_STATUS_BITS)); } else { int [] newEntry = writeEntry(key, hash, result.value(), result.metadata()); if (hashtable != originalTable || !isPresent(laterEntry)) { storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); return computeWithMetadata(key, remappingFunction); } storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, result.metadata()); storageEngine.invalidateCache(); storageEngine.freeMapping(readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE), false); long oldEncoding = readLong(laterEntry, ENCODING); laterEntry.put(newEntry); slotUpdated((IntBuffer) laterEntry.flip(), oldEncoding); hit(laterEntry); } return result; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } if (!hashtable.hasRemaining()) { hashtable.rewind(); } laterEntry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); } MetadataTuple<V> result = remappingFunction.apply(key, null); if (result != null) { int [] newEntry = writeEntry(key, hash, result.value(), result.metadata()); if (hashtable != originalTable) { storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); return computeWithMetadata(key, remappingFunction); } else if (!isAvailable(entry)) { throw new AssertionError(); } if (isRemoved(entry)) { removedSlots--; } storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, result.metadata()); storageEngine.invalidateCache(); entry.put(newEntry); slotAdded(entry); hit(entry); } return result; } else if (keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { long existingEncoding = readLong(entry, ENCODING); int existingStatus = entry.get(STATUS); @SuppressWarnings("unchecked") MetadataTuple<V> existingTuple = metadataTuple( (V) storageEngine.readValue(existingEncoding), existingStatus & ~RESERVED_STATUS_BITS); MetadataTuple<V> result = remappingFunction.apply(key, existingTuple); if (result == null) { storageEngine.freeMapping(existingEncoding, hash, true); entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); shrink(); } else if (result == existingTuple) { //nop } else if (result.value() == existingTuple.value()) { entry.put(STATUS, (existingStatus & RESERVED_STATUS_BITS) | (result.metadata() & ~RESERVED_STATUS_BITS)); } else { int [] newEntry = writeEntry(key, hash, result.value(), result.metadata()); if (hashtable != originalTable || !isPresent(entry)) { storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); return computeWithMetadata(key, remappingFunction); } storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, result.metadata()); storageEngine.invalidateCache(); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), false); entry.put(newEntry); slotUpdated((IntBuffer) entry.flip(), existingEncoding); hit(entry); } return result; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } // hit reprobe limit - must rehash expand(start, limit); return computeWithMetadata(key, remappingFunction); } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public MetadataTuple<V> computeIfAbsentWithMetadata(K key, Function<? super K,? extends MetadataTuple<V>> mappingFunction) { freePendingTables(); int hash = key.hashCode(); IntBuffer originalTable = hashtable; int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isAvailable(entry)) { for (IntBuffer laterEntry = entry; i < limit; i++) { if (isTerminating(laterEntry)) { break; } else if (isPresent(laterEntry) && keyEquals(key, hash, readLong(laterEntry, ENCODING), laterEntry.get(KEY_HASHCODE))) { @SuppressWarnings("unchecked") MetadataTuple<V> tuple = metadataTuple( (V) storageEngine.readValue(readLong(laterEntry, ENCODING)), laterEntry.get(STATUS) & ~RESERVED_STATUS_BITS); return tuple; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } if (!hashtable.hasRemaining()) { hashtable.rewind(); } laterEntry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); } MetadataTuple<V> result = mappingFunction.apply(key); if (result != null) { int [] newEntry = writeEntry(key, hash, result.value(), result.metadata()); if (hashtable != originalTable) { storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); return computeIfAbsentWithMetadata(key, mappingFunction); } else if (!isAvailable(entry)) { throw new AssertionError(); } if (isRemoved(entry)) { removedSlots--; } storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, result.metadata()); storageEngine.invalidateCache(); entry.put(newEntry); slotAdded(entry); hit(entry); } return result; } else if (keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { @SuppressWarnings("unchecked") MetadataTuple<V> tuple = metadataTuple( (V) storageEngine.readValue(readLong(entry, ENCODING)), entry.get(STATUS) & ~RESERVED_STATUS_BITS); return tuple; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } // hit reprobe limit - must rehash expand(start, limit); return computeIfAbsentWithMetadata(key, mappingFunction); } @FindbugsSuppressWarnings("VO_VOLATILE_INCREMENT") public MetadataTuple<V> computeIfPresentWithMetadata(K key, BiFunction<? super K,? super MetadataTuple<V>,? extends MetadataTuple<V>> remappingFunction) { freePendingTables(); int hash = key.hashCode(); IntBuffer originalTable = hashtable; int start = indexFor(spread(hash)); hashtable.position(start); int limit = reprobeLimit(); for (int i = 0; i < limit; i++) { if (!hashtable.hasRemaining()) { hashtable.rewind(); } IntBuffer entry = (IntBuffer) hashtable.slice().limit(ENTRY_SIZE); if (isTerminating(entry)) { return null; } else if (isPresent(entry) && keyEquals(key, hash, readLong(entry, ENCODING), entry.get(KEY_HASHCODE))) { long existingEncoding = readLong(entry, ENCODING); int existingStatus = entry.get(STATUS); @SuppressWarnings("unchecked") MetadataTuple<V> existingValue = metadataTuple( (V) storageEngine.readValue(existingEncoding), existingStatus & ~RESERVED_STATUS_BITS); MetadataTuple<V> result = remappingFunction.apply(key, existingValue); if (result == null) { storageEngine.freeMapping(existingEncoding, hash, true); entry.put(STATUS, STATUS_REMOVED); slotRemoved(entry); shrink(); } else if (result == existingValue) { //nop } else if (result.value() == existingValue.value()) { entry.put(STATUS, (existingStatus & RESERVED_STATUS_BITS) | (result.metadata() & ~RESERVED_STATUS_BITS)); } else { int [] newEntry = writeEntry(key, hash, result.value(), result.metadata()); if (hashtable != originalTable || !isPresent(entry)) { storageEngine.freeMapping(readLong(newEntry, ENCODING), newEntry[KEY_HASHCODE], false); return computeIfPresentWithMetadata(key, remappingFunction); //not exactly ideal - but technically correct } storageEngine.attachedMapping(readLong(newEntry, ENCODING), hash, result.metadata()); storageEngine.invalidateCache(); storageEngine.freeMapping(readLong(entry, ENCODING), entry.get(KEY_HASHCODE), false); entry.put(newEntry); slotUpdated((IntBuffer) entry.flip(), readLong(entry, ENCODING)); hit(entry); } return result; } else { hashtable.position(hashtable.position() + ENTRY_SIZE); } } return null; } }