/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.store;

import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;

import org.apache.lucene.store.IOContext.Context;
import org.apache.lucene.util.SuppressForbidden;

// TODO
//   - newer Linux kernel versions (after 2.6.29) have
//     improved MADV_SEQUENTIAL (and hopefully also
//     FADV_SEQUENTIAL) interaction with the buffer
//     cache; we should explore using that instead of direct
//     IO when context is merge

A Directory implementation for all Unixes that uses DIRECT I/O to bypass OS level IO caching during merging. For all other cases (searching, writing) we delegate to the provided Directory instance.

See Overview for more details.

To use this you must compile NativePosixUtil.cpp (exposes Linux-specific APIs through JNI) for your platform, by running ant build-native-unix, and then putting the resulting libNativePosixUtil.so (from lucene/build/native) onto your dynamic linker search path.

WARNING: this code is very new and quite easily could contain horrible bugs. For example, here's one known issue: if you use seek in IndexOutput, and then write more than one buffer's worth of bytes, then the file will be wrong. Lucene does not do this today (only writes small number of bytes after seek), but that may change.

This directory passes Solr and Lucene tests on Linux and OS X; other Unixes should work but have not been tested! Use at your own risk.

@lucene.experimental
/** * A {@link Directory} implementation for all Unixes that uses * DIRECT I/O to bypass OS level IO caching during * merging. For all other cases (searching, writing) we delegate * to the provided Directory instance. * * <p>See <a * href="{@docRoot}/overview-summary.html#NativeUnixDirectory">Overview</a> * for more details. * * <p>To use this you must compile * NativePosixUtil.cpp (exposes Linux-specific APIs through * JNI) for your platform, by running <code>ant * build-native-unix</code>, and then putting the resulting * <code>libNativePosixUtil.so</code> (from * <code>lucene/build/native</code>) onto your dynamic * linker search path. * * <p><b>WARNING</b>: this code is very new and quite easily * could contain horrible bugs. For example, here's one * known issue: if you use seek in <code>IndexOutput</code>, and then * write more than one buffer's worth of bytes, then the * file will be wrong. Lucene does not do this today (only writes * small number of bytes after seek), but that may change. * * <p>This directory passes Solr and Lucene tests on Linux * and OS X; other Unixes should work but have not been * tested! Use at your own risk. * * @lucene.experimental */
public class NativeUnixDirectory extends FSDirectory { // TODO: this is OS dependent, but likely 512 is the LCD private final static long ALIGN = 512; private final static long ALIGN_NOT_MASK = ~(ALIGN-1);
Default buffer size before writing to disk (256 KB); larger means less IO load but more RAM and direct buffer storage space consumed during merging.
/** Default buffer size before writing to disk (256 KB); * larger means less IO load but more RAM and direct * buffer storage space consumed during merging. */
public final static int DEFAULT_MERGE_BUFFER_SIZE = 262144;
Default min expected merge size before direct IO is used (10 MB):
/** Default min expected merge size before direct IO is * used (10 MB): */
public final static long DEFAULT_MIN_BYTES_DIRECT = 10*1024*1024; private final int mergeBufferSize; private final long minBytesDirect; private final Directory delegate;
Create a new NIOFSDirectory for the named location.
Params:
  • path – the path of the directory
  • lockFactory – to use
  • mergeBufferSize – Size of buffer to use for merging. See DEFAULT_MERGE_BUFFER_SIZE.
  • minBytesDirect – Merges, or files to be opened for reading, smaller than this will not use direct IO. See DEFAULT_MIN_BYTES_DIRECT
  • delegate – fallback Directory for non-merges
Throws:
/** Create a new NIOFSDirectory for the named location. * * @param path the path of the directory * @param lockFactory to use * @param mergeBufferSize Size of buffer to use for * merging. See {@link #DEFAULT_MERGE_BUFFER_SIZE}. * @param minBytesDirect Merges, or files to be opened for * reading, smaller than this will * not use direct IO. See {@link * #DEFAULT_MIN_BYTES_DIRECT} * @param delegate fallback Directory for non-merges * @throws IOException If there is a low-level I/O error */
public NativeUnixDirectory(Path path, int mergeBufferSize, long minBytesDirect, LockFactory lockFactory, Directory delegate) throws IOException { super(path, lockFactory); if ((mergeBufferSize & ALIGN) != 0) { throw new IllegalArgumentException("mergeBufferSize must be 0 mod " + ALIGN + " (got: " + mergeBufferSize + ")"); } this.mergeBufferSize = mergeBufferSize; this.minBytesDirect = minBytesDirect; this.delegate = delegate; }
Create a new NIOFSDirectory for the named location.
Params:
  • path – the path of the directory
  • lockFactory – the lock factory to use
  • delegate – fallback Directory for non-merges
Throws:
/** Create a new NIOFSDirectory for the named location. * * @param path the path of the directory * @param lockFactory the lock factory to use * @param delegate fallback Directory for non-merges * @throws IOException If there is a low-level I/O error */
public NativeUnixDirectory(Path path, LockFactory lockFactory, Directory delegate) throws IOException { this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, lockFactory, delegate); }
Create a new NIOFSDirectory for the named location with FSLockFactory.getDefault().
Params:
  • path – the path of the directory
  • delegate – fallback Directory for non-merges
Throws:
/** Create a new NIOFSDirectory for the named location with {@link FSLockFactory#getDefault()}. * * @param path the path of the directory * @param delegate fallback Directory for non-merges * @throws IOException If there is a low-level I/O error */
public NativeUnixDirectory(Path path, Directory delegate) throws IOException { this(path, DEFAULT_MERGE_BUFFER_SIZE, DEFAULT_MIN_BYTES_DIRECT, FSLockFactory.getDefault(), delegate); } @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect || fileLength(name) < minBytesDirect) { return delegate.openInput(name, context); } else { return new NativeUnixIndexInput(getDirectory().resolve(name), mergeBufferSize); } } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); if (context.context != Context.MERGE || context.mergeInfo.estimatedMergeBytes < minBytesDirect) { return delegate.createOutput(name, context); } else { return new NativeUnixIndexOutput(getDirectory().resolve(name), name, mergeBufferSize); } } @SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor") private final static class NativeUnixIndexOutput extends IndexOutput { private final ByteBuffer buffer; private final FileOutputStream fos; private final FileChannel channel; private final int bufferSize; //private final File path; private int bufferPos; private long filePos; private long fileLength; private boolean isOpen; public NativeUnixIndexOutput(Path path, String name, int bufferSize) throws IOException { super("NativeUnixIndexOutput(path=\"" + path.toString() + "\")", name); //this.path = path; final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), false); fos = new FileOutputStream(fd); //fos = new FileOutputStream(path); channel = fos.getChannel(); buffer = ByteBuffer.allocateDirect(bufferSize); this.bufferSize = bufferSize; isOpen = true; } @Override public void writeByte(byte b) throws IOException { assert bufferPos == buffer.position(): "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); buffer.put(b); if (++bufferPos == bufferSize) { dump(); } } @Override public void writeBytes(byte[] src, int offset, int len) throws IOException { int toWrite = len; while(true) { final int left = bufferSize - bufferPos; if (left <= toWrite) { buffer.put(src, offset, left); toWrite -= left; offset += left; bufferPos = bufferSize; dump(); } else { buffer.put(src, offset, toWrite); bufferPos += toWrite; break; } } } //@Override //public void setLength() throws IOException { // TODO -- how to impl this? neither FOS nor // FileChannel provides an API? //} private void dump() throws IOException { buffer.flip(); final long limit = filePos + buffer.limit(); if (limit > fileLength) { // this dump extends the file fileLength = limit; } else { // we had seek'd back & wrote some changes } // must always round to next block buffer.limit((int) ((buffer.limit() + ALIGN - 1) & ALIGN_NOT_MASK)); assert (buffer.limit() & ALIGN_NOT_MASK) == buffer.limit() : "limit=" + buffer.limit() + " vs " + (buffer.limit() & ALIGN_NOT_MASK); assert (filePos & ALIGN_NOT_MASK) == filePos; //System.out.println(Thread.currentThread().getName() + ": dump to " + filePos + " limit=" + buffer.limit() + " fos=" + fos); channel.write(buffer, filePos); filePos += bufferPos; bufferPos = 0; buffer.clear(); //System.out.println("dump: done"); // TODO: the case where we'd seek'd back, wrote an // entire buffer, we must here read the next buffer; // likely Lucene won't trip on this since we only // write smallish amounts on seeking back } @Override public long getFilePointer() { return filePos + bufferPos; } @Override public long getChecksum() throws IOException { throw new UnsupportedOperationException("this directory currently does not work at all!"); } @Override public void close() throws IOException { if (isOpen) { isOpen = false; try { dump(); } finally { try { //System.out.println("direct close set len=" + fileLength + " vs " + channel.size() + " path=" + path); channel.truncate(fileLength); //System.out.println(" now: " + channel.size()); } finally { try { channel.close(); } finally { fos.close(); //System.out.println(" final len=" + path.length()); } } } } } } @SuppressForbidden(reason = "java.io.File: native API requires old-style FileDescriptor") private final static class NativeUnixIndexInput extends IndexInput { private final ByteBuffer buffer; private final FileInputStream fis; private final FileChannel channel; private final int bufferSize; private boolean isOpen; private boolean isClone; private long filePos; private int bufferPos; public NativeUnixIndexInput(Path path, int bufferSize) throws IOException { super("NativeUnixIndexInput(path=\"" + path + "\")"); final FileDescriptor fd = NativePosixUtil.open_direct(path.toString(), true); fis = new FileInputStream(fd); channel = fis.getChannel(); this.bufferSize = bufferSize; buffer = ByteBuffer.allocateDirect(bufferSize); isOpen = true; isClone = false; filePos = -bufferSize; bufferPos = bufferSize; //System.out.println("D open " + path + " this=" + this); } // for clone public NativeUnixIndexInput(NativeUnixIndexInput other) throws IOException { super(other.toString()); this.fis = null; channel = other.channel; this.bufferSize = other.bufferSize; buffer = ByteBuffer.allocateDirect(bufferSize); filePos = -bufferSize; bufferPos = bufferSize; isOpen = true; isClone = true; //System.out.println("D clone this=" + this); seek(other.getFilePointer()); } @Override public void close() throws IOException { if (isOpen && !isClone) { try { channel.close(); } finally { if (!isClone) { fis.close(); } } } } @Override public long getFilePointer() { return filePos + bufferPos; } @Override public void seek(long pos) throws IOException { if (pos != getFilePointer()) { final long alignedPos = pos & ALIGN_NOT_MASK; filePos = alignedPos-bufferSize; final int delta = (int) (pos - alignedPos); if (delta != 0) { refill(); buffer.position(delta); bufferPos = delta; } else { // force refill on next read bufferPos = bufferSize; } } } @Override public long length() { try { return channel.size(); } catch (IOException ioe) { throw new RuntimeException("IOException during length(): " + this, ioe); } } @Override public byte readByte() throws IOException { // NOTE: we don't guard against EOF here... ie the // "final" buffer will typically be filled to less // than bufferSize if (bufferPos == bufferSize) { refill(); } assert bufferPos == buffer.position() : "bufferPos=" + bufferPos + " vs buffer.position()=" + buffer.position(); bufferPos++; return buffer.get(); } private void refill() throws IOException { buffer.clear(); filePos += bufferSize; bufferPos = 0; assert (filePos & ALIGN_NOT_MASK) == filePos : "filePos=" + filePos + " anded=" + (filePos & ALIGN_NOT_MASK); //System.out.println("X refill filePos=" + filePos); int n; try { n = channel.read(buffer, filePos); } catch (IOException ioe) { throw new IOException(ioe.getMessage() + ": " + this, ioe); } if (n < 0) { throw new EOFException("read past EOF: " + this); } buffer.rewind(); } @Override public void readBytes(byte[] dst, int offset, int len) throws IOException { int toRead = len; //System.out.println("\nX readBytes len=" + len + " fp=" + getFilePointer() + " size=" + length() + " this=" + this); while(true) { final int left = bufferSize - bufferPos; if (left < toRead) { //System.out.println(" copy " + left); buffer.get(dst, offset, left); toRead -= left; offset += left; refill(); } else { //System.out.println(" copy " + toRead); buffer.get(dst, offset, toRead); bufferPos += toRead; //System.out.println(" readBytes done"); break; } } } @Override public NativeUnixIndexInput clone() { try { return new NativeUnixIndexInput(this); } catch (IOException ioe) { throw new RuntimeException("IOException during clone: " + this, ioe); } } @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { // TODO: is this the right thing to do? return BufferedIndexInput.wrap(sliceDescription, this, offset, length); } } }