/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.cassandra.cache;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;

import com.github.benmanes.caffeine.cache.*;
import com.codahale.metrics.Timer;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.CacheMissMetrics;
import org.apache.cassandra.utils.memory.BufferPool;

public class ChunkCache
        implements CacheLoader<ChunkCache.Key, ChunkCache.Buffer>, RemovalListener<ChunkCache.Key, ChunkCache.Buffer>, CacheSize
{
    public static final int RESERVED_POOL_SPACE_IN_MB = 32;
    public static final long cacheSize = 1024L * 1024L * Math.max(0, DatabaseDescriptor.getFileCacheSizeInMB() - RESERVED_POOL_SPACE_IN_MB);
    public static final boolean roundUp = DatabaseDescriptor.getFileCacheRoundUp();

    private static boolean enabled = cacheSize > 0;
    public static final ChunkCache instance = enabled ? new ChunkCache() : null;

    private final LoadingCache<Key, Buffer> cache;
    public final CacheMissMetrics metrics;

    static class Key
    {
        final ChunkReader file;
        final String path;
        final long position;

        public Key(ChunkReader file, long position)
        {
            super();
            this.file = file;
            this.position = position;
            this.path = file.channel().filePath();
        }

        public int hashCode()
        {
            final int prime = 31;
            int result = 1;
            result = prime * result + path.hashCode();
            result = prime * result + file.getClass().hashCode();
            result = prime * result + Long.hashCode(position);
            return result;
        }

        public boolean equals(Object obj)
        {
            if (this == obj)
                return true;
            if (obj == null)
                return false;

            Key other = (Key) obj;
            return (position == other.position)
                    && file.getClass() == other.file.getClass()
                    && path.equals(other.path);
        }
    }

    static class Buffer implements Rebufferer.BufferHolder
    {
        private final ByteBuffer buffer;
        private final long offset;
        private final AtomicInteger references;

        public Buffer(ByteBuffer buffer, long offset)
        {
            this.buffer = buffer;
            this.offset = offset;
            references = new AtomicInteger(1);  // start referenced.
        }

        Buffer reference()
        {
            int refCount;
            do
            {
                refCount = references.get();
                if (refCount == 0)
                    // Buffer was released before we managed to reference it.
                    return null;
            } while (!references.compareAndSet(refCount, refCount + 1));

            return this;
        }

        @Override
        public ByteBuffer buffer()
        {
            assert references.get() > 0;
            return buffer.duplicate();
        }

        @Override
        public long offset()
        {
            return offset;
        }

        @Override
        public void release()
        {
            if (references.decrementAndGet() == 0)
                BufferPool.put(buffer);
        }
    }

    public ChunkCache()
    {
        cache = Caffeine.newBuilder()
                .maximumWeight(cacheSize)
                .executor(MoreExecutors.directExecutor())
                .weigher((key, buffer) -> ((Buffer) buffer).buffer.capacity())
                .removalListener(this)
                .build(this);
        metrics = new CacheMissMetrics("ChunkCache", this);
    }

    @Override
    public Buffer load(Key key) throws Exception
    {
        ChunkReader rebufferer = key.file;
        metrics.misses.mark();
        try (Timer.Context ctx = metrics.missLatency.time())
        {
            ByteBuffer buffer = BufferPool.get(key.file.chunkSize(), key.file.preferredBufferType());
            assert buffer != null;
            rebufferer.readChunk(key.position, buffer);
            return new Buffer(buffer, key.position);
        }
    }

    @Override
    public void onRemoval(Key key, Buffer buffer, RemovalCause cause)
    {
        buffer.release();
    }

    public void close()
    {
        cache.invalidateAll();
    }

    public RebuffererFactory wrap(ChunkReader file)
    {
        return new CachingRebufferer(file);
    }

    public static RebuffererFactory maybeWrap(ChunkReader file)
    {
        if (!enabled)
            return file;

        return instance.wrap(file);
    }

    public void invalidatePosition(FileHandle dfile, long position)
    {
        if (!(dfile.rebuffererFactory() instanceof CachingRebufferer))
            return;

        ((CachingRebufferer) dfile.rebuffererFactory()).invalidate(position);
    }

    public void invalidateFile(String fileName)
    {
        cache.invalidateAll(Iterables.filter(cache.asMap().keySet(), x -> x.path.equals(fileName)));
    }

    @VisibleForTesting
    public void enable(boolean enabled)
    {
        ChunkCache.enabled = enabled;
        cache.invalidateAll();
        metrics.reset();
    }

    // TODO: Invalidate caches for obsoleted/MOVED_START tables?

    
Rebufferer providing cached chunks where data is obtained from the specified ChunkReader. Thread-safe. One instance per SegmentedFile, created by ChunkCache.maybeWrap if the cache is enabled.
/** * Rebufferer providing cached chunks where data is obtained from the specified ChunkReader. * Thread-safe. One instance per SegmentedFile, created by ChunkCache.maybeWrap if the cache is enabled. */
class CachingRebufferer implements Rebufferer, RebuffererFactory { private final ChunkReader source; final long alignmentMask; public CachingRebufferer(ChunkReader file) { source = file; int chunkSize = file.chunkSize(); assert Integer.bitCount(chunkSize) == 1 : String.format("%d must be a power of two", chunkSize); alignmentMask = -chunkSize; } @Override public Buffer rebuffer(long position) { try { metrics.requests.mark(); long pageAlignedPos = position & alignmentMask; Buffer buf; do buf = cache.get(new Key(source, pageAlignedPos)).reference(); while (buf == null); return buf; } catch (Throwable t) { Throwables.propagateIfInstanceOf(t.getCause(), CorruptSSTableException.class); throw Throwables.propagate(t); } } public void invalidate(long position) { long pageAlignedPos = position & alignmentMask; cache.invalidate(new Key(source, pageAlignedPos)); } @Override public Rebufferer instantiateRebufferer() { return this; } @Override public void close() { source.close(); } @Override public void closeReader() { // Instance is shared among readers. Nothing to release. } @Override public ChannelProxy channel() { return source.channel(); } @Override public long fileLength() { return source.fileLength(); } @Override public double getCrcCheckChance() { return source.getCrcCheckChance(); } @Override public String toString() { return "CachingRebufferer:" + source.toString(); } } @Override public long capacity() { return cacheSize; } @Override public void setCapacity(long capacity) { throw new UnsupportedOperationException("Chunk cache size cannot be changed."); } @Override public int size() { return cache.asMap().size(); } @Override public long weightedSize() { return cache.policy().eviction() .map(policy -> policy.weightedSize().orElseGet(cache::estimatedSize)) .orElseGet(cache::estimatedSize); } }