/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.store;


import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.IOUtils;

// TODO
//   - let subclass dictate policy...?
//   - rename to MergeCacheingDir?  NRTCachingDir

Wraps a RAMDirectory around any provided delegate directory, to be used during NRT search.

This class is likely only useful in a near-real-time context, where indexing rate is lowish but reopen rate is highish, resulting in many tiny files being written. This directory keeps such segments (as well as the segments produced by merging them, as long as they are small enough), in RAM.

This is safe to use: when your app calls {IndexWriter#commit}, all cached files will be flushed from the cached and sync'd.

Here's a simple example usage:

  Directory fsDir = FSDirectory.open(new File("/path/to/index").toPath());
  NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0);
  IndexWriterConfig conf = new IndexWriterConfig(analyzer);
  IndexWriter writer = new IndexWriter(cachedFSDir, conf);

This will cache all newly flushed segments, all merges whose expected segment size is <= 5 MB, unless the net cached bytes exceeds 60 MB at which point all writes will not be cached (until the net bytes falls below 60 MB).

@lucene.experimental
/** * Wraps a {@link RAMDirectory} * around any provided delegate directory, to * be used during NRT search. * * <p>This class is likely only useful in a near-real-time * context, where indexing rate is lowish but reopen * rate is highish, resulting in many tiny files being * written. This directory keeps such segments (as well as * the segments produced by merging them, as long as they * are small enough), in RAM.</p> * * <p>This is safe to use: when your app calls {IndexWriter#commit}, * all cached files will be flushed from the cached and sync'd.</p> * * <p>Here's a simple example usage: * * <pre class="prettyprint"> * Directory fsDir = FSDirectory.open(new File("/path/to/index").toPath()); * NRTCachingDirectory cachedFSDir = new NRTCachingDirectory(fsDir, 5.0, 60.0); * IndexWriterConfig conf = new IndexWriterConfig(analyzer); * IndexWriter writer = new IndexWriter(cachedFSDir, conf); * </pre> * * <p>This will cache all newly flushed segments, all merges * whose expected segment size is {@code <= 5 MB}, unless the net * cached bytes exceeds 60 MB at which point all writes will * not be cached (until the net bytes falls below 60 MB).</p> * * @lucene.experimental */
public class NRTCachingDirectory extends FilterDirectory implements Accountable { private final RAMDirectory cache = new RAMDirectory(); private final long maxMergeSizeBytes; private final long maxCachedBytes; private static final boolean VERBOSE = false;
We will cache a newly created output if 1) it's a flush or a merge and the estimated size of the merged segment is <= maxMergeSizeMB, and 2) the total cached bytes is <= maxCachedMB
/** * We will cache a newly created output if 1) it's a * flush or a merge and the estimated size of the merged segment is * {@code <= maxMergeSizeMB}, and 2) the total cached bytes is * {@code <= maxCachedMB} */
public NRTCachingDirectory(Directory delegate, double maxMergeSizeMB, double maxCachedMB) { super(delegate); maxMergeSizeBytes = (long) (maxMergeSizeMB*1024*1024); maxCachedBytes = (long) (maxCachedMB*1024*1024); } @Override public String toString() { return "NRTCachingDirectory(" + in + "; maxCacheMB=" + (maxCachedBytes/1024/1024.) + " maxMergeSizeMB=" + (maxMergeSizeBytes/1024/1024.) + ")"; } @Override public synchronized String[] listAll() throws IOException { final Set<String> files = new HashSet<>(); for(String f : cache.listAll()) { files.add(f); } for(String f : in.listAll()) { files.add(f); } String[] result = files.toArray(new String[files.size()]); Arrays.sort(result); return result; } @Override public synchronized void deleteFile(String name) throws IOException { if (VERBOSE) { System.out.println("nrtdir.deleteFile name=" + name); } if (cache.fileNameExists(name)) { cache.deleteFile(name); } else { in.deleteFile(name); } } @Override public synchronized long fileLength(String name) throws IOException { if (cache.fileNameExists(name)) { return cache.fileLength(name); } else { return in.fileLength(name); } } public String[] listCachedFiles() { return cache.listAll(); } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { if (VERBOSE) { System.out.println("nrtdir.createOutput name=" + name); } if (doCacheWrite(name, context)) { if (VERBOSE) { System.out.println(" to cache"); } return cache.createOutput(name, context); } else { return in.createOutput(name, context); } } @Override public void sync(Collection<String> fileNames) throws IOException { if (VERBOSE) { System.out.println("nrtdir.sync files=" + fileNames); } for(String fileName : fileNames) { unCache(fileName); } in.sync(fileNames); } @Override public void rename(String source, String dest) throws IOException { unCache(source); if (cache.fileNameExists(dest)) { throw new IllegalArgumentException("target file " + dest + " already exists"); } in.rename(source, dest); } @Override public synchronized IndexInput openInput(String name, IOContext context) throws IOException { if (VERBOSE) { System.out.println("nrtdir.openInput name=" + name); } if (cache.fileNameExists(name)) { if (VERBOSE) { System.out.println(" from cache"); } return cache.openInput(name, context); } else { return in.openInput(name, context); } }
Close this directory, which flushes any cached files to the delegate and then closes the delegate.
/** Close this directory, which flushes any cached files * to the delegate and then closes the delegate. */
@Override public void close() throws IOException { // NOTE: technically we shouldn't have to do this, ie, // IndexWriter should have sync'd all files, but we do // it for defensive reasons... or in case the app is // doing something custom (creating outputs directly w/o // using IndexWriter): boolean success = false; try { if (cache.isOpen) { for(String fileName : cache.listAll()) { unCache(fileName); } } success = true; } finally { if (success) { IOUtils.close(cache, in); } else { IOUtils.closeWhileHandlingException(cache, in); } } }
Subclass can override this to customize logic; return true if this file should be written to the RAMDirectory.
/** Subclass can override this to customize logic; return * true if this file should be written to the RAMDirectory. */
protected boolean doCacheWrite(String name, IOContext context) { //System.out.println(Thread.currentThread().getName() + ": CACHE check merge=" + merge + " size=" + (merge==null ? 0 : merge.estimatedMergeBytes)); long bytes = 0; if (context.mergeInfo != null) { bytes = context.mergeInfo.estimatedMergeBytes; } else if (context.flushInfo != null) { bytes = context.flushInfo.estimatedSegmentSize; } return (bytes <= maxMergeSizeBytes) && (bytes + cache.ramBytesUsed()) <= maxCachedBytes; } @Override public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { if (VERBOSE) { System.out.println("nrtdir.createTempOutput prefix=" + prefix + " suffix=" + suffix); } Set<String> toDelete = new HashSet<>(); // This is very ugly/messy/dangerous (can in some disastrous case maybe create too many temp files), but I don't know of a cleaner way: boolean success = false; Directory first; Directory second; if (doCacheWrite(prefix, context)) { first = cache; second = in; } else { first = in; second = cache; } IndexOutput out = null; try { while (true) { out = first.createTempOutput(prefix, suffix, context); String name = out.getName(); toDelete.add(name); if (slowFileExists(second, name)) { out.close(); } else { toDelete.remove(name); success = true; break; } } } finally { if (success) { IOUtils.deleteFiles(first, toDelete); } else { IOUtils.closeWhileHandlingException(out); IOUtils.deleteFilesIgnoringExceptions(first, toDelete); } } return out; }
Returns true if the file exists (can be opened), false if it cannot be opened, and (unlike Java's File.exists) throws IOException if there's some unexpected error.
/** Returns true if the file exists * (can be opened), false if it cannot be opened, and * (unlike Java's File.exists) throws IOException if * there's some unexpected error. */
static boolean slowFileExists(Directory dir, String fileName) throws IOException { try { dir.openInput(fileName, IOContext.DEFAULT).close(); return true; } catch (NoSuchFileException | FileNotFoundException e) { return false; } } private final Object uncacheLock = new Object(); private void unCache(String fileName) throws IOException { // Only let one thread uncache at a time; this only // happens during commit() or close(): synchronized(uncacheLock) { if (VERBOSE) { System.out.println("nrtdir.unCache name=" + fileName); } if (!cache.fileNameExists(fileName)) { // Another thread beat us... return; } assert slowFileExists(in, fileName) == false: "fileName=" + fileName + " exists both in cache and in delegate"; final IOContext context = IOContext.DEFAULT; final IndexOutput out = in.createOutput(fileName, context); IndexInput in = null; try { in = cache.openInput(fileName, context); out.copyBytes(in, in.length()); } finally { IOUtils.close(in, out); } // Lock order: uncacheLock -> this synchronized(this) { // Must sync here because other sync methods have // if (cache.fileNameExists(name)) { ... } else { ... }: cache.deleteFile(fileName); } } } @Override public long ramBytesUsed() { return cache.ramBytesUsed(); } @Override public Collection<Accountable> getChildResources() { return Collections.singleton(Accountables.namedAccountable("cache", cache)); } }