/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.compaction;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.lang.ref.WeakReference;
import java.nio.file.*;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

import com.google.common.collect.MapMaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.NoSpamLogger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;

public class CompactionLogger
{
    public interface Strategy
    {
        JsonNode sstable(SSTableReader sstable);

        JsonNode options();

        static Strategy none = new Strategy()
        {
            public JsonNode sstable(SSTableReader sstable)
            {
                return null;
            }

            public JsonNode options()
            {
                return null;
            }
        };
    }

    
This will produce the compaction strategy's starting information.
/** * This will produce the compaction strategy's starting information. */
public interface StrategySummary { JsonNode getSummary(); }
This is an interface to allow writing to a different interface.
/** * This is an interface to allow writing to a different interface. */
public interface Writer {
This is used when we are already trying to write out the start of a
Params:
  • statement – This should be written out to the medium capturing the logs
  • tag – This is an identifier for a strategy; each strategy should have a distinct Object
/** * This is used when we are already trying to write out the start of a * @param statement This should be written out to the medium capturing the logs * @param tag This is an identifier for a strategy; each strategy should have a distinct Object */
void writeStart(JsonNode statement, Object tag);
Params:
  • statement – This should be written out to the medium capturing the logs
  • summary – This can be used when a tag is not recognized by this writer; this can be because the file has been rolled, or otherwise the writer had to start over
  • tag – This is an identifier for a strategy; each strategy should have a distinct Object
/** * @param statement This should be written out to the medium capturing the logs * @param summary This can be used when a tag is not recognized by this writer; this can be because the file * has been rolled, or otherwise the writer had to start over * @param tag This is an identifier for a strategy; each strategy should have a distinct Object */
void write(JsonNode statement, StrategySummary summary, Object tag); } private interface CompactionStrategyAndTableFunction { JsonNode apply(AbstractCompactionStrategy strategy, SSTableReader sstable); } private static final JsonNodeFactory json = JsonNodeFactory.instance; private static final Logger logger = LoggerFactory.getLogger(CompactionLogger.class); private static final Writer serializer = new CompactionLogSerializer(); private final WeakReference<ColumnFamilyStore> cfsRef; private final WeakReference<CompactionStrategyManager> csmRef; private final AtomicInteger identifier = new AtomicInteger(0); private final Map<AbstractCompactionStrategy, String> compactionStrategyMapping = new MapMaker().weakKeys().makeMap(); private final AtomicBoolean enabled = new AtomicBoolean(false); public CompactionLogger(ColumnFamilyStore cfs, CompactionStrategyManager csm) { csmRef = new WeakReference<>(csm); cfsRef = new WeakReference<>(cfs); } private void forEach(Consumer<AbstractCompactionStrategy> consumer) { CompactionStrategyManager csm = csmRef.get(); if (csm == null) return; csm.getStrategies() .forEach(l -> l.forEach(consumer)); } private ArrayNode compactionStrategyMap(Function<AbstractCompactionStrategy, JsonNode> select) { ArrayNode node = json.arrayNode(); forEach(acs -> node.add(select.apply(acs))); return node; } private ArrayNode sstableMap(Collection<SSTableReader> sstables, CompactionStrategyAndTableFunction csatf) { CompactionStrategyManager csm = csmRef.get(); ArrayNode node = json.arrayNode(); if (csm == null) return node; sstables.forEach(t -> node.add(csatf.apply(csm.getCompactionStrategyFor(t), t))); return node; } private String getId(AbstractCompactionStrategy strategy) { return compactionStrategyMapping.computeIfAbsent(strategy, s -> String.valueOf(identifier.getAndIncrement())); } private JsonNode formatSSTables(AbstractCompactionStrategy strategy) { ArrayNode node = json.arrayNode(); CompactionStrategyManager csm = csmRef.get(); ColumnFamilyStore cfs = cfsRef.get(); if (csm == null || cfs == null) return node; for (SSTableReader sstable : cfs.getLiveSSTables()) { if (csm.getCompactionStrategyFor(sstable) == strategy) node.add(formatSSTable(strategy, sstable)); } return node; } private JsonNode formatSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable) { ObjectNode node = json.objectNode(); node.put("generation", sstable.descriptor.generation); node.put("version", sstable.descriptor.version.getVersion()); node.put("size", sstable.onDiskLength()); JsonNode logResult = strategy.strategyLogger().sstable(sstable); if (logResult != null) node.put("details", logResult); return node; } private JsonNode startStrategy(AbstractCompactionStrategy strategy) { ObjectNode node = json.objectNode(); CompactionStrategyManager csm = csmRef.get(); if (csm == null) return node; node.put("strategyId", getId(strategy)); node.put("type", strategy.getName()); node.put("tables", formatSSTables(strategy)); node.put("repaired", csm.isRepaired(strategy)); List<String> folders = csm.getStrategyFolders(strategy); ArrayNode folderNode = json.arrayNode(); for (String folder : folders) { folderNode.add(folder); } node.put("folders", folderNode); JsonNode logResult = strategy.strategyLogger().options(); if (logResult != null) node.put("options", logResult); return node; } private JsonNode shutdownStrategy(AbstractCompactionStrategy strategy) { ObjectNode node = json.objectNode(); node.put("strategyId", getId(strategy)); return node; } private JsonNode describeSSTable(AbstractCompactionStrategy strategy, SSTableReader sstable) { ObjectNode node = json.objectNode(); node.put("strategyId", getId(strategy)); node.put("table", formatSSTable(strategy, sstable)); return node; } private void describeStrategy(ObjectNode node) { ColumnFamilyStore cfs = cfsRef.get(); if (cfs == null) return; node.put("keyspace", cfs.keyspace.getName()); node.put("table", cfs.getTableName()); node.put("time", System.currentTimeMillis()); } private JsonNode startStrategies() { ObjectNode node = json.objectNode(); node.put("type", "enable"); describeStrategy(node); node.put("strategies", compactionStrategyMap(this::startStrategy)); return node; } public void enable() { if (enabled.compareAndSet(false, true)) { serializer.writeStart(startStrategies(), this); } } public void disable() { if (enabled.compareAndSet(true, false)) { ObjectNode node = json.objectNode(); node.put("type", "disable"); describeStrategy(node); node.put("strategies", compactionStrategyMap(this::shutdownStrategy)); serializer.write(node, this::startStrategies, this); } } public void flush(Collection<SSTableReader> sstables) { if (enabled.get()) { ObjectNode node = json.objectNode(); node.put("type", "flush"); describeStrategy(node); node.put("tables", sstableMap(sstables, this::describeSSTable)); serializer.write(node, this::startStrategies, this); } } public void compaction(long startTime, Collection<SSTableReader> input, long endTime, Collection<SSTableReader> output) { if (enabled.get()) { ObjectNode node = json.objectNode(); node.put("type", "compaction"); describeStrategy(node); node.put("start", String.valueOf(startTime)); node.put("end", String.valueOf(endTime)); node.put("input", sstableMap(input, this::describeSSTable)); node.put("output", sstableMap(output, this::describeSSTable)); serializer.write(node, this::startStrategies, this); } } public void pending(AbstractCompactionStrategy strategy, int remaining) { if (remaining != 0 && enabled.get()) { ObjectNode node = json.objectNode(); node.put("type", "pending"); describeStrategy(node); node.put("strategyId", getId(strategy)); node.put("pending", remaining); serializer.write(node, this::startStrategies, this); } } private static class CompactionLogSerializer implements Writer { private static final String logDirectory = System.getProperty("cassandra.logdir", "."); private final ExecutorService loggerService = Executors.newFixedThreadPool(1); // This is only accessed on the logger service thread, so it does not need to be thread safe private final Set<Object> rolled = new HashSet<>(); private OutputStreamWriter stream; private static OutputStreamWriter createStream() throws IOException { int count = 0; Path compactionLog = Paths.get(logDirectory, "compaction.log"); if (Files.exists(compactionLog)) { Path tryPath = compactionLog; while (Files.exists(tryPath)) { tryPath = Paths.get(logDirectory, String.format("compaction-%d.log", count++)); } Files.move(compactionLog, tryPath); } return new OutputStreamWriter(Files.newOutputStream(compactionLog, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)); } private void writeLocal(String toWrite) { try { if (stream == null) stream = createStream(); stream.write(toWrite); stream.flush(); } catch (IOException ioe) { // We'll drop the change and log the error to the logger. NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 1, TimeUnit.MINUTES, "Could not write to the log file: {}", ioe); } } public void writeStart(JsonNode statement, Object tag) { final String toWrite = statement.toString() + System.lineSeparator(); loggerService.execute(() -> { rolled.add(tag); writeLocal(toWrite); }); } public void write(JsonNode statement, StrategySummary summary, Object tag) { final String toWrite = statement.toString() + System.lineSeparator(); loggerService.execute(() -> { if (!rolled.contains(tag)) { writeLocal(summary.getSummary().toString() + System.lineSeparator()); rolled.add(tag); } writeLocal(toWrite); }); } } }