/*
 * Copyright (C) 2008-2011, Google Inc.
 * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
 * and other copyright owners as documented in the project's IP log.
 *
 * This program and the accompanying materials are made available
 * under the terms of the Eclipse Distribution License v1.0 which
 * accompanies this distribution, is reproduced below, and is
 * available at http://www.eclipse.org/org/documents/edl-v10.php
 *
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * - Redistributions of source code must retain the above copyright
 *   notice, this list of conditions and the following disclaimer.
 *
 * - Redistributions in binary form must reproduce the above
 *   copyright notice, this list of conditions and the following
 *   disclaimer in the documentation and/or other materials provided
 *   with the distribution.
 *
 * - Neither the name of the Eclipse Foundation, Inc. nor the
 *   names of its contributors may be used to endorse or promote
 *   products derived from this software without specific prior
 *   written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
 * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

package org.eclipse.jgit.internal.storage.dfs;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.LongStream;

import org.eclipse.jgit.internal.JGitText;
import org.eclipse.jgit.internal.storage.pack.PackExt;

Caches slices of a BlockBasedFile in memory for faster read access.

The DfsBlockCache serves as a Java based "buffer cache", loading segments of a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do reads of only tiny slices of a file, the DfsBlockCache tries to smooth out these tiny reads into larger block-sized IO operations.

Whenever a cache miss occurs, loading is invoked by exactly one thread for the given (DfsStreamKey,position) key tuple. This is ensured by an array of locks, with the tuple hashed to a lock instance.

Its too expensive during object access to be accurate with a least recently used (LRU) algorithm. Strictly ordering every read is a lot of overhead that typically doesn't yield a corresponding benefit to the application. This cache implements a clock replacement algorithm, giving each block one chance to have been accessed during a sweep of the cache to save itself from eviction.

Entities created by the cache are held under hard references, preventing the Java VM from clearing anything. Blocks are discarded by the replacement algorithm when adding a new block would cause the cache to exceed its configured maximum size.

The key tuple is passed through to methods as a pair of parameters rather than as a single Object, thus reducing the transient memory allocations of callers. It is more efficient to avoid the allocation, as we can't be 100% sure that a JIT would be able to stack-allocate a key tuple.

The internal hash table does not expand at runtime, instead it is fixed in size at cache creation time. The internal lock table used to gate load invocations is also fixed in size.

/** * Caches slices of a * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for * faster read access. * <p> * The DfsBlockCache serves as a Java based "buffer cache", loading segments of * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out * these tiny reads into larger block-sized IO operations. * <p> * Whenever a cache miss occurs, loading is invoked by exactly one thread for * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by * an array of locks, with the tuple hashed to a lock instance. * <p> * Its too expensive during object access to be accurate with a least recently * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that * typically doesn't yield a corresponding benefit to the application. This * cache implements a clock replacement algorithm, giving each block one chance * to have been accessed during a sweep of the cache to save itself from * eviction. * <p> * Entities created by the cache are held under hard references, preventing the * Java VM from clearing anything. Blocks are discarded by the replacement * algorithm when adding a new block would cause the cache to exceed its * configured maximum size. * <p> * The key tuple is passed through to methods as a pair of parameters rather * than as a single Object, thus reducing the transient memory allocations of * callers. It is more efficient to avoid the allocation, as we can't be 100% * sure that a JIT would be able to stack-allocate a key tuple. * <p> * The internal hash table does not expand at runtime, instead it is fixed in * size at cache creation time. The internal lock table used to gate load * invocations is also fixed in size. */
public final class DfsBlockCache { private static volatile DfsBlockCache cache; static { reconfigure(new DfsBlockCacheConfig()); }
Modify the configuration of the window cache.

The new configuration is applied immediately, and the existing cache is cleared.

Params:
  • cfg – the new window cache configuration.
Throws:
/** * Modify the configuration of the window cache. * <p> * The new configuration is applied immediately, and the existing cache is * cleared. * * @param cfg * the new window cache configuration. * @throws java.lang.IllegalArgumentException * the cache configuration contains one or more invalid * settings, usually too low of a limit. */
public static void reconfigure(DfsBlockCacheConfig cfg) { cache = new DfsBlockCache(cfg); }
Get the currently active DfsBlockCache.
Returns:the currently active DfsBlockCache.
/** * Get the currently active DfsBlockCache. * * @return the currently active DfsBlockCache. */
public static DfsBlockCache getInstance() { return cache; }
Number of entries in table.
/** Number of entries in {@link #table}. */
private final int tableSize;
Hash bucket directory; entries are chained below.
/** Hash bucket directory; entries are chained below. */
private final AtomicReferenceArray<HashEntry> table;
Locks to prevent concurrent loads for same (PackFile,position) block. The number of locks is DfsBlockCacheConfig.getConcurrencyLevel() to cap the overall concurrent block loads.
/** * Locks to prevent concurrent loads for same (PackFile,position) block. The * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to * cap the overall concurrent block loads. */
private final ReentrantLock[] loadLocks;
A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile.
/** * A separate pool of locks to prevent concurrent loads for same index or bitmap from PackFile. */
private final ReentrantLock[] refLocks;
Maximum number of bytes the cache should hold.
/** Maximum number of bytes the cache should hold. */
private final long maxBytes;
Pack files smaller than this size can be copied through the cache.
/** Pack files smaller than this size can be copied through the cache. */
private final long maxStreamThroughCache;
Suggested block size to read from pack files in.

If a pack file does not have a native block size, this size will be used.

If a pack file has a native size, a whole multiple of the native size will be used until it matches this size.

The value for blockSize must be a power of 2.

/** * Suggested block size to read from pack files in. * <p> * If a pack file does not have a native block size, this size will be used. * <p> * If a pack file has a native size, a whole multiple of the native size * will be used until it matches this size. * <p> * The value for blockSize must be a power of 2. */
private final int blockSize;
As blockSize is a power of 2, bits to shift for a / blockSize.
/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
private final int blockSizeShift;
Number of times a block was found in the cache, per pack file extension.
/** * Number of times a block was found in the cache, per pack file extension. */
private final AtomicReference<AtomicLong[]> statHit;
Number of times a block was not found, and had to be loaded, per pack file extension.
/** * Number of times a block was not found, and had to be loaded, per pack * file extension. */
private final AtomicReference<AtomicLong[]> statMiss;
Number of blocks evicted due to cache being full, per pack file extension.
/** * Number of blocks evicted due to cache being full, per pack file * extension. */
private final AtomicReference<AtomicLong[]> statEvict;
Number of bytes currently loaded in the cache, per pack file extension.
/** * Number of bytes currently loaded in the cache, per pack file extension. */
private final AtomicReference<AtomicLong[]> liveBytes;
Protects the clock and its related data.
/** Protects the clock and its related data. */
private final ReentrantLock clockLock;
A consumer of object reference lock wait time milliseconds. May be used to build a metric.
/** * A consumer of object reference lock wait time milliseconds. May be used to build a metric. */
private final Consumer<Long> refLockWaitTime;
Current position of the clock.
/** Current position of the clock. */
private Ref clockHand; @SuppressWarnings("unchecked") private DfsBlockCache(DfsBlockCacheConfig cfg) { tableSize = tableSize(cfg); if (tableSize < 1) { throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1); } table = new AtomicReferenceArray<>(tableSize); loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()]; for (int i = 0; i < loadLocks.length; i++) { loadLocks[i] = new ReentrantLock(true /* fair */); } refLocks = new ReentrantLock[cfg.getConcurrencyLevel()]; for (int i = 0; i < refLocks.length; i++) { refLocks[i] = new ReentrantLock(true /* fair */); } maxBytes = cfg.getBlockLimit(); maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio()); blockSize = cfg.getBlockSize(); blockSizeShift = Integer.numberOfTrailingZeros(blockSize); clockLock = new ReentrantLock(true /* fair */); String none = ""; //$NON-NLS-1$ clockHand = new Ref<>( DfsStreamKey.of(new DfsRepositoryDescription(none), none, null), -1, 0, null); clockHand.next = clockHand; statHit = new AtomicReference<>(newCounters()); statMiss = new AtomicReference<>(newCounters()); statEvict = new AtomicReference<>(newCounters()); liveBytes = new AtomicReference<>(newCounters()); refLockWaitTime = cfg.getRefLockWaitTimeConsumer(); } boolean shouldCopyThroughCache(long length) { return length <= maxStreamThroughCache; }
Get total number of bytes in the cache, per pack file extension.
Returns:total number of bytes in the cache, per pack file extension.
/** * Get total number of bytes in the cache, per pack file extension. * * @return total number of bytes in the cache, per pack file extension. */
public long[] getCurrentSize() { return getStatVals(liveBytes); }
Get 0..100, defining how full the cache is.
Returns:0..100, defining how full the cache is.
/** * Get 0..100, defining how full the cache is. * * @return 0..100, defining how full the cache is. */
public long getFillPercentage() { return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes; }
Get number of requests for items in the cache, per pack file extension.
Returns:number of requests for items in the cache, per pack file extension.
/** * Get number of requests for items in the cache, per pack file extension. * * @return number of requests for items in the cache, per pack file * extension. */
public long[] getHitCount() { return getStatVals(statHit); }
Get number of requests for items not in the cache, per pack file extension.
Returns:number of requests for items not in the cache, per pack file extension.
/** * Get number of requests for items not in the cache, per pack file * extension. * * @return number of requests for items not in the cache, per pack file * extension. */
public long[] getMissCount() { return getStatVals(statMiss); }
Get total number of requests (hit + miss), per pack file extension.
Returns:total number of requests (hit + miss), per pack file extension.
/** * Get total number of requests (hit + miss), per pack file extension. * * @return total number of requests (hit + miss), per pack file extension. */
public long[] getTotalRequestCount() { AtomicLong[] hit = statHit.get(); AtomicLong[] miss = statMiss.get(); long[] cnt = new long[Math.max(hit.length, miss.length)]; for (int i = 0; i < hit.length; i++) { cnt[i] += hit[i].get(); } for (int i = 0; i < miss.length; i++) { cnt[i] += miss[i].get(); } return cnt; }
Get hit ratios
Returns:hit ratios
/** * Get hit ratios * * @return hit ratios */
public long[] getHitRatio() { AtomicLong[] hit = statHit.get(); AtomicLong[] miss = statMiss.get(); long[] ratio = new long[Math.max(hit.length, miss.length)]; for (int i = 0; i < ratio.length; i++) { if (i >= hit.length) { ratio[i] = 0; } else if (i >= miss.length) { ratio[i] = 100; } else { long hitVal = hit[i].get(); long missVal = miss[i].get(); long total = hitVal + missVal; ratio[i] = total == 0 ? 0 : hitVal * 100 / total; } } return ratio; }
Get number of evictions performed due to cache being full, per pack file extension.
Returns:number of evictions performed due to cache being full, per pack file extension.
/** * Get number of evictions performed due to cache being full, per pack file * extension. * * @return number of evictions performed due to cache being full, per pack * file extension. */
public long[] getEvictions() { return getStatVals(statEvict); }
Quickly check if the cache contains block 0 of the given stream.

This can be useful for sophisticated pre-read algorithms to quickly determine if a file is likely already in cache, especially small reftables which may be smaller than a typical DFS block size.

Params:
  • key – the file to check.
Returns:true if block 0 (the first block) is in the cache.
/** * Quickly check if the cache contains block 0 of the given stream. * <p> * This can be useful for sophisticated pre-read algorithms to quickly * determine if a file is likely already in cache, especially small * reftables which may be smaller than a typical DFS block size. * * @param key * the file to check. * @return true if block 0 (the first block) is in the cache. */
public boolean hasBlock0(DfsStreamKey key) { HashEntry e1 = table.get(slot(key, 0)); DfsBlock v = scan(e1, key, 0); return v != null && v.contains(key, 0); } private int hash(int packHash, long off) { return packHash + (int) (off >>> blockSizeShift); } int getBlockSize() { return blockSize; } private static int tableSize(DfsBlockCacheConfig cfg) { final int wsz = cfg.getBlockSize(); final long limit = cfg.getBlockLimit(); if (wsz <= 0) { throw new IllegalArgumentException(JGitText.get().invalidWindowSize); } if (limit < wsz) { throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit); } return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE); }
Look up a cached object, creating and loading it if it doesn't exist.
Params:
  • file – the pack that "contains" the cached object.
  • position – offset within pack of the object.
  • ctx – current thread's reader.
  • fileChannel – supplier for channel to read pack.
Throws:
  • IOException – the reference was not in the cache and could not be loaded.
Returns:the object reference.
/** * Look up a cached object, creating and loading it if it doesn't exist. * * @param file * the pack that "contains" the cached object. * @param position * offset within <code>pack</code> of the object. * @param ctx * current thread's reader. * @param fileChannel * supplier for channel to read {@code pack}. * @return the object reference. * @throws IOException * the reference was not in the cache and could not be loaded. */
DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx, ReadableChannelSupplier fileChannel) throws IOException { final long requestedPosition = position; position = file.alignToBlock(position); DfsStreamKey key = file.key; int slot = slot(key, position); HashEntry e1 = table.get(slot); DfsBlock v = scan(e1, key, position); if (v != null && v.contains(key, requestedPosition)) { ctx.stats.blockCacheHit++; getStat(statHit, key).incrementAndGet(); return v; } reserveSpace(blockSize, key); ReentrantLock regionLock = lockFor(key, position); regionLock.lock(); try { HashEntry e2 = table.get(slot); if (e2 != e1) { v = scan(e2, key, position); if (v != null) { ctx.stats.blockCacheHit++; getStat(statHit, key).incrementAndGet(); creditSpace(blockSize, key); return v; } } getStat(statMiss, key).incrementAndGet(); boolean credit = true; try { v = file.readOneBlock(requestedPosition, ctx, fileChannel.get()); credit = false; } finally { if (credit) { creditSpace(blockSize, key); } } if (position != v.start) { // The file discovered its blockSize and adjusted. position = v.start; slot = slot(key, position); e2 = table.get(slot); } Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v); ref.hot = true; for (;;) { HashEntry n = new HashEntry(clean(e2), ref); if (table.compareAndSet(slot, e2, n)) { break; } e2 = table.get(slot); } addToClock(ref, blockSize - v.size()); } finally { regionLock.unlock(); } // If the block size changed from the default, it is possible the block // that was loaded is the wrong block for the requested position. if (v.contains(file.key, requestedPosition)) { return v; } return getOrLoad(file, requestedPosition, ctx, fileChannel); } @SuppressWarnings("unchecked") private void reserveSpace(int reserve, DfsStreamKey key) { clockLock.lock(); try { long live = LongStream.of(getCurrentSize()).sum() + reserve; if (maxBytes < live) { Ref prev = clockHand; Ref hand = clockHand.next; do { if (hand.hot) { // Value was recently touched. Clear // hot and give it another chance. hand.hot = false; prev = hand; hand = hand.next; continue; } else if (prev == hand) break; // No recent access since last scan, kill // value and remove from clock. Ref dead = hand; hand = hand.next; prev.next = hand; dead.next = null; dead.value = null; live -= dead.size; getStat(liveBytes, dead.key).addAndGet(-dead.size); getStat(statEvict, dead.key).incrementAndGet(); } while (maxBytes < live); clockHand = prev; } getStat(liveBytes, key).addAndGet(reserve); } finally { clockLock.unlock(); } } private void creditSpace(int credit, DfsStreamKey key) { clockLock.lock(); try { getStat(liveBytes, key).addAndGet(-credit); } finally { clockLock.unlock(); } } @SuppressWarnings("unchecked") private void addToClock(Ref ref, int credit) { clockLock.lock(); try { if (credit != 0) { getStat(liveBytes, ref.key).addAndGet(-credit); } Ref ptr = clockHand; ref.next = ptr.next; ptr.next = ref; clockHand = ref; } finally { clockLock.unlock(); } } void put(DfsBlock v) { put(v.stream, v.start, v.size(), v); }
Look up a cached object, creating and loading it if it doesn't exist.
Params:
  • key – the stream key of the pack.
  • loader – the function to load the reference.
Throws:
  • IOException – the reference was not in the cache and could not be loaded.
Returns:the object reference.
/** * Look up a cached object, creating and loading it if it doesn't exist. * * @param key * the stream key of the pack. * @param loader * the function to load the reference. * @return the object reference. * @throws IOException * the reference was not in the cache and could not be loaded. */
<T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader) throws IOException { int slot = slot(key, 0); HashEntry e1 = table.get(slot); Ref<T> ref = scanRef(e1, key, 0); if (ref != null) { getStat(statHit, key).incrementAndGet(); return ref; } ReentrantLock regionLock = lockForRef(key); long lockStart = System.currentTimeMillis(); regionLock.lock(); try { HashEntry e2 = table.get(slot); if (e2 != e1) { ref = scanRef(e2, key, 0); if (ref != null) { getStat(statHit, key).incrementAndGet(); return ref; } } if (refLockWaitTime != null) { refLockWaitTime.accept( Long.valueOf(System.currentTimeMillis() - lockStart)); } getStat(statMiss, key).incrementAndGet(); ref = loader.load(); ref.hot = true; // Reserve after loading to get the size of the object reserveSpace(ref.size, key); for (;;) { HashEntry n = new HashEntry(clean(e2), ref); if (table.compareAndSet(slot, e2, n)) { break; } e2 = table.get(slot); } addToClock(ref, 0); } finally { regionLock.unlock(); } return ref; } <T> Ref<T> putRef(DfsStreamKey key, long size, T v) { return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v); } <T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) { int slot = slot(key, pos); HashEntry e1 = table.get(slot); Ref<T> ref = scanRef(e1, key, pos); if (ref != null) { return ref; } reserveSpace(size, key); ReentrantLock regionLock = lockFor(key, pos); regionLock.lock(); try { HashEntry e2 = table.get(slot); if (e2 != e1) { ref = scanRef(e2, key, pos); if (ref != null) { creditSpace(size, key); return ref; } } ref = new Ref<>(key, pos, size, v); ref.hot = true; for (;;) { HashEntry n = new HashEntry(clean(e2), ref); if (table.compareAndSet(slot, e2, n)) { break; } e2 = table.get(slot); } addToClock(ref, 0); } finally { regionLock.unlock(); } return ref; } boolean contains(DfsStreamKey key, long position) { return scan(table.get(slot(key, position)), key, position) != null; } @SuppressWarnings("unchecked") <T> T get(DfsStreamKey key, long position) { T val = (T) scan(table.get(slot(key, position)), key, position); if (val == null) { getStat(statMiss, key).incrementAndGet(); } else { getStat(statHit, key).incrementAndGet(); } return val; } private <T> T scan(HashEntry n, DfsStreamKey key, long position) { Ref<T> r = scanRef(n, key, position); return r != null ? r.get() : null; } @SuppressWarnings("unchecked") private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) { for (; n != null; n = n.next) { Ref<T> r = n.ref; if (r.position == position && r.key.equals(key)) { return r.get() != null ? r : null; } } return null; } private int slot(DfsStreamKey key, long position) { return (hash(key.hash, position) >>> 1) % tableSize; } private ReentrantLock lockFor(DfsStreamKey key, long position) { return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length]; } private ReentrantLock lockForRef(DfsStreamKey key) { return refLocks[(key.hash >>> 1) % refLocks.length]; } private static AtomicLong[] newCounters() { AtomicLong[] ret = new AtomicLong[PackExt.values().length]; for (int i = 0; i < ret.length; i++) { ret[i] = new AtomicLong(); } return ret; } private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats, DfsStreamKey key) { int pos = key.packExtPos; while (true) { AtomicLong[] vals = stats.get(); if (pos < vals.length) { return vals[pos]; } AtomicLong[] expect = vals; vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)]; System.arraycopy(expect, 0, vals, 0, expect.length); for (int i = expect.length; i < vals.length; i++) { vals[i] = new AtomicLong(); } if (stats.compareAndSet(expect, vals)) { return vals[pos]; } } } private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) { AtomicLong[] stats = stat.get(); long[] cnt = new long[stats.length]; for (int i = 0; i < stats.length; i++) { cnt[i] = stats[i].get(); } return cnt; } private static HashEntry clean(HashEntry top) { while (top != null && top.ref.next == null) top = top.next; if (top == null) { return null; } HashEntry n = clean(top.next); return n == top.next ? top : new HashEntry(n, top.ref); } private static final class HashEntry {
Next entry in the hash table's chain list.
/** Next entry in the hash table's chain list. */
final HashEntry next;
The referenced object.
/** The referenced object. */
final Ref ref; HashEntry(HashEntry n, Ref r) { next = n; ref = r; } } static final class Ref<T> { final DfsStreamKey key; final long position; final int size; volatile T value; Ref next; volatile boolean hot; Ref(DfsStreamKey key, long position, int size, T v) { this.key = key; this.position = position; this.size = size; this.value = v; } T get() { T v = value; if (v != null) { hot = true; } return v; } boolean has() { return value != null; } } @FunctionalInterface interface RefLoader<T> { Ref<T> load() throws IOException; }
Supplier for readable channel
/** * Supplier for readable channel */
@FunctionalInterface interface ReadableChannelSupplier {
Throws:
Returns:ReadableChannel
/** * @return ReadableChannel * @throws IOException */
ReadableChannel get() throws IOException; } }