/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.index;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;

import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimitedIndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;

A MergeScheduler that runs each merge using a separate thread.

Specify the max number of threads that may run at once, and the maximum number of simultaneous merges with setMaxMergesAndThreads.

If the number of merges exceeds the max number of threads then the largest merges are paused until one of the smaller merges completes.

If more than getMaxMergeCount merges are requested then this class will forcefully throttle the incoming threads by pausing until one more merges complete.

This class attempts to detect whether the index is on rotational storage (traditional hard drive) or not (e.g. solid-state disk) and changes the default max merge and thread count accordingly. This detection is currently Linux-only, and relies on the OS to put the right value into /sys/block/<dev>/block/rotational. For all other operating systems it currently assumes a rotational disk for backwards compatibility. To enable default settings for spinning or solid state disks for such operating systems, use setDefaultMaxMergesAndThreads(boolean).

/** A {@link MergeScheduler} that runs each merge using a * separate thread. * * <p>Specify the max number of threads that may run at * once, and the maximum number of simultaneous merges * with {@link #setMaxMergesAndThreads}.</p> * * <p>If the number of merges exceeds the max number of threads * then the largest merges are paused until one of the smaller * merges completes.</p> * * <p>If more than {@link #getMaxMergeCount} merges are * requested then this class will forcefully throttle the * incoming threads by pausing until one more merges * complete.</p> * * <p>This class attempts to detect whether the index is * on rotational storage (traditional hard drive) or not * (e.g. solid-state disk) and changes the default max merge * and thread count accordingly. This detection is currently * Linux-only, and relies on the OS to put the right value * into /sys/block/&lt;dev&gt;/block/rotational. For all * other operating systems it currently assumes a rotational * disk for backwards compatibility. To enable default * settings for spinning or solid state disks for such * operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}. */
public class ConcurrentMergeScheduler extends MergeScheduler {
Dynamic default for maxThreadCount and maxMergeCount, used to detect whether the index is backed by an SSD or rotational disk and set maxThreadCount accordingly. If it's an SSD, maxThreadCount is set to max(1, min(4, cpuCoreCount/2)), otherwise 1. Note that detection only currently works on Linux; other platforms will assume the index is not on an SSD.
/** Dynamic default for {@code maxThreadCount} and {@code maxMergeCount}, * used to detect whether the index is backed by an SSD or rotational disk and * set {@code maxThreadCount} accordingly. If it's an SSD, * {@code maxThreadCount} is set to {@code max(1, min(4, cpuCoreCount/2))}, * otherwise 1. Note that detection only currently works on * Linux; other platforms will assume the index is not on an SSD. */
public static final int AUTO_DETECT_MERGES_AND_THREADS = -1;
Used for testing.
@lucene.internal
/** Used for testing. * * @lucene.internal */
public static final String DEFAULT_CPU_CORE_COUNT_PROPERTY = "lucene.cms.override_core_count";
Used for testing.
@lucene.internal
/** Used for testing. * * @lucene.internal */
public static final String DEFAULT_SPINS_PROPERTY = "lucene.cms.override_spins";
List of currently active MergeThreads.
/** List of currently active {@link MergeThread}s. */
protected final List<MergeThread> mergeThreads = new ArrayList<>(); // Max number of merge threads allowed to be running at // once. When there are more merges then this, we // forcefully pause the larger ones, letting the smaller // ones run, up until maxMergeCount merges at which point // we forcefully pause incoming threads (that presumably // are the ones causing so much merging). private int maxThreadCount = AUTO_DETECT_MERGES_AND_THREADS; // Max number of merges we accept before forcefully // throttling the incoming threads private int maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS;
How many MergeThreads have kicked off (this is use to name them).
/** How many {@link MergeThread}s have kicked off (this is use * to name them). */
protected int mergeThreadCount;
Floor for IO write rate limit (we will never go any lower than this)
/** Floor for IO write rate limit (we will never go any lower than this) */
private static final double MIN_MERGE_MB_PER_SEC = 5.0;
Ceiling for IO write rate limit (we will never go any higher than this)
/** Ceiling for IO write rate limit (we will never go any higher than this) */
private static final double MAX_MERGE_MB_PER_SEC = 10240.0;
Initial value for IO write rate limit when doAutoIOThrottle is true
/** Initial value for IO write rate limit when doAutoIOThrottle is true */
private static final double START_MB_PER_SEC = 20.0;
Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount).
/** Merges below this size are not counted in the maxThreadCount, i.e. they can freely run in their own thread (up until maxMergeCount). */
private static final double MIN_BIG_MERGE_MB = 50.0;
Current IO writes throttle rate
/** Current IO writes throttle rate */
protected double targetMBPerSec = START_MB_PER_SEC;
true if we should rate-limit writes for each merge
/** true if we should rate-limit writes for each merge */
private boolean doAutoIOThrottle = true; private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;
Sole constructor, with all settings set to default values.
/** Sole constructor, with all settings set to default * values. */
public ConcurrentMergeScheduler() { }
Expert: directly set the maximum number of merge threads and simultaneous merges allowed.
Params:
  • maxMergeCount – the max # simultaneous merges that are allowed. If a merge is necessary yet we already have this many threads running, the incoming thread (that is calling add/updateDocument) will block until a merge thread has completed. Note that we will only run the smallest maxThreadCount merges at a time.
  • maxThreadCount – the max # simultaneous merge threads that should be running at once. This must be <= maxMergeCount
/** * Expert: directly set the maximum number of merge threads and * simultaneous merges allowed. * * @param maxMergeCount the max # simultaneous merges that are allowed. * If a merge is necessary yet we already have this many * threads running, the incoming thread (that is calling * add/updateDocument) will block until a merge thread * has completed. Note that we will only run the * smallest <code>maxThreadCount</code> merges at a time. * @param maxThreadCount the max # simultaneous merge threads that should * be running at once. This must be &lt;= <code>maxMergeCount</code> */
public synchronized void setMaxMergesAndThreads(int maxMergeCount, int maxThreadCount) { if (maxMergeCount == AUTO_DETECT_MERGES_AND_THREADS && maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { // OK this.maxMergeCount = AUTO_DETECT_MERGES_AND_THREADS; this.maxThreadCount = AUTO_DETECT_MERGES_AND_THREADS; } else if (maxMergeCount == AUTO_DETECT_MERGES_AND_THREADS) { throw new IllegalArgumentException("both maxMergeCount and maxThreadCount must be AUTO_DETECT_MERGES_AND_THREADS"); } else if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { throw new IllegalArgumentException("both maxMergeCount and maxThreadCount must be AUTO_DETECT_MERGES_AND_THREADS"); } else { if (maxThreadCount < 1) { throw new IllegalArgumentException("maxThreadCount should be at least 1"); } if (maxMergeCount < 1) { throw new IllegalArgumentException("maxMergeCount should be at least 1"); } if (maxThreadCount > maxMergeCount) { throw new IllegalArgumentException("maxThreadCount should be <= maxMergeCount (= " + maxMergeCount + ")"); } this.maxThreadCount = maxThreadCount; this.maxMergeCount = maxMergeCount; } }
Sets max merges and threads to proper defaults for rotational or non-rotational storage.
Params:
  • spins – true to set defaults best for traditional rotatational storage (spinning disks), else false (e.g. for solid-state disks)
/** Sets max merges and threads to proper defaults for rotational * or non-rotational storage. * * @param spins true to set defaults best for traditional rotatational storage (spinning disks), * else false (e.g. for solid-state disks) */
public synchronized void setDefaultMaxMergesAndThreads(boolean spins) { if (spins) { maxThreadCount = 1; maxMergeCount = 6; } else { int coreCount = Runtime.getRuntime().availableProcessors(); // Let tests override this to help reproducing a failure on a machine that has a different // core count than the one where the test originally failed: try { String value = System.getProperty(DEFAULT_CPU_CORE_COUNT_PROPERTY); if (value != null) { coreCount = Integer.parseInt(value); } } catch (Throwable ignored) { } maxThreadCount = Math.max(1, Math.min(4, coreCount/2)); maxMergeCount = maxThreadCount+5; } }
Set the per-merge IO throttle rate for forced merges (default: Double.POSITIVE_INFINITY).
/** Set the per-merge IO throttle rate for forced merges (default: {@code Double.POSITIVE_INFINITY}). */
public synchronized void setForceMergeMBPerSec(double v) { forceMergeMBPerSec = v; updateMergeThreads(); }
Get the per-merge IO throttle rate for forced merges.
/** Get the per-merge IO throttle rate for forced merges. */
public synchronized double getForceMergeMBPerSec() { return forceMergeMBPerSec; }
Turn on dynamic IO throttling, to adaptively rate limit writes bytes/sec to the minimal rate necessary so merges do not fall behind. By default this is enabled.
/** Turn on dynamic IO throttling, to adaptively rate limit writes * bytes/sec to the minimal rate necessary so merges do not fall behind. * By default this is enabled. */
public synchronized void enableAutoIOThrottle() { doAutoIOThrottle = true; targetMBPerSec = START_MB_PER_SEC; updateMergeThreads(); }
Turn off auto IO throttling.
See Also:
  • enableAutoIOThrottle
/** Turn off auto IO throttling. * * @see #enableAutoIOThrottle */
public synchronized void disableAutoIOThrottle() { doAutoIOThrottle = false; updateMergeThreads(); }
Returns true if auto IO throttling is currently enabled.
/** Returns true if auto IO throttling is currently enabled. */
public synchronized boolean getAutoIOThrottle() { return doAutoIOThrottle; }
Returns the currently set per-merge IO writes rate limit, if enableAutoIOThrottle was called, else Double.POSITIVE_INFINITY.
/** Returns the currently set per-merge IO writes rate limit, if {@link #enableAutoIOThrottle} * was called, else {@code Double.POSITIVE_INFINITY}. */
public synchronized double getIORateLimitMBPerSec() { if (doAutoIOThrottle) { return targetMBPerSec; } else { return Double.POSITIVE_INFINITY; } }
Returns maxThreadCount.
See Also:
/** Returns {@code maxThreadCount}. * * @see #setMaxMergesAndThreads(int, int) */
public synchronized int getMaxThreadCount() { return maxThreadCount; } /** See {@link #setMaxMergesAndThreads}. */ public synchronized int getMaxMergeCount() { return maxMergeCount; }
Removes the calling thread from the active merge threads.
/** Removes the calling thread from the active merge threads. */
synchronized void removeMergeThread() { Thread currentThread = Thread.currentThread(); // Paranoia: don't trust Thread.equals: for(int i=0;i<mergeThreads.size();i++) { if (mergeThreads.get(i) == currentThread) { mergeThreads.remove(i); return; } } assert false: "merge thread " + currentThread + " was not found"; } @Override public Directory wrapForMerge(OneMerge merge, Directory in) { Thread mergeThread = Thread.currentThread(); if (!MergeThread.class.isInstance(mergeThread)) { throw new AssertionError("wrapForMerge should be called from MergeThread. Current thread: " + mergeThread); } // Return a wrapped Directory which has rate-limited output. RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter; return new FilterDirectory(in) { @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); // This Directory is only supposed to be used during merging, // so all writes should have MERGE context, else there is a bug // somewhere that is failing to pass down the right IOContext: assert context.context == IOContext.Context.MERGE: "got context=" + context.context; // Because rateLimiter is bound to a particular merge thread, this method should // always be called from that context. Verify this. assert mergeThread == Thread.currentThread() : "Not the same merge thread, current=" + Thread.currentThread() + ", expected=" + mergeThread; return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context)); } }; }
Called whenever the running merges have changed, to set merge IO limits. This method sorts the merge threads by their merge size in descending order and then pauses/unpauses threads from first to last -- that way, smaller merges are guaranteed to run before larger ones.
/** * Called whenever the running merges have changed, to set merge IO limits. * This method sorts the merge threads by their merge size in * descending order and then pauses/unpauses threads from first to last -- * that way, smaller merges are guaranteed to run before larger ones. */
protected synchronized void updateMergeThreads() { // Only look at threads that are alive & not in the // process of stopping (ie have an active merge): final List<MergeThread> activeMerges = new ArrayList<>(); int threadIdx = 0; while (threadIdx < mergeThreads.size()) { final MergeThread mergeThread = mergeThreads.get(threadIdx); if (!mergeThread.isAlive()) { // Prune any dead threads mergeThreads.remove(threadIdx); continue; } activeMerges.add(mergeThread); threadIdx++; } // Sort the merge threads, largest first: CollectionUtil.timSort(activeMerges); final int activeMergeCount = activeMerges.size(); int bigMergeCount = 0; for (threadIdx=activeMergeCount-1;threadIdx>=0;threadIdx--) { MergeThread mergeThread = activeMerges.get(threadIdx); if (mergeThread.merge.estimatedMergeBytes > MIN_BIG_MERGE_MB*1024*1024) { bigMergeCount = 1+threadIdx; break; } } long now = System.nanoTime(); StringBuilder message; if (verbose()) { message = new StringBuilder(); message.append(String.format(Locale.ROOT, "updateMergeThreads ioThrottle=%s targetMBPerSec=%.1f MB/sec", doAutoIOThrottle, targetMBPerSec)); } else { message = null; } for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) { MergeThread mergeThread = activeMerges.get(threadIdx); OneMerge merge = mergeThread.merge; // pause the thread if maxThreadCount is smaller than the number of merge threads. final boolean doPause = threadIdx < bigMergeCount - maxThreadCount; double newMBPerSec; if (doPause) { newMBPerSec = 0.0; } else if (merge.maxNumSegments != -1) { newMBPerSec = forceMergeMBPerSec; } else if (doAutoIOThrottle == false) { newMBPerSec = Double.POSITIVE_INFINITY; } else if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB*1024*1024) { // Don't rate limit small merges: newMBPerSec = Double.POSITIVE_INFINITY; } else { newMBPerSec = targetMBPerSec; } MergeRateLimiter rateLimiter = mergeThread.rateLimiter; double curMBPerSec = rateLimiter.getMBPerSec(); if (verbose()) { long mergeStartNS = merge.mergeStartNS; if (mergeStartNS == -1) { // IndexWriter didn't start the merge yet: mergeStartNS = now; } message.append('\n'); message.append(String.format(Locale.ROOT, "merge thread %s estSize=%.1f MB (written=%.1f MB) runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s\n", mergeThread.getName(), bytesToMB(merge.estimatedMergeBytes), bytesToMB(rateLimiter.getTotalBytesWritten()), nsToSec(now - mergeStartNS), nsToSec(rateLimiter.getTotalStoppedNS()), nsToSec(rateLimiter.getTotalPausedNS()), rateToString(rateLimiter.getMBPerSec()))); if (newMBPerSec != curMBPerSec) { if (newMBPerSec == 0.0) { message.append(" now stop"); } else if (curMBPerSec == 0.0) { if (newMBPerSec == Double.POSITIVE_INFINITY) { message.append(" now resume"); } else { message.append(String.format(Locale.ROOT, " now resume to %.1f MB/sec", newMBPerSec)); } } else { message.append(String.format(Locale.ROOT, " now change from %.1f MB/sec to %.1f MB/sec", curMBPerSec, newMBPerSec)); } } else if (curMBPerSec == 0.0) { message.append(" leave stopped"); } else { message.append(String.format(Locale.ROOT, " leave running at %.1f MB/sec", curMBPerSec)); } } rateLimiter.setMBPerSec(newMBPerSec); } if (verbose()) { message(message.toString()); } } private synchronized void initDynamicDefaults(IndexWriter writer) throws IOException { if (maxThreadCount == AUTO_DETECT_MERGES_AND_THREADS) { boolean spins = IOUtils.spins(writer.getDirectory()); // Let tests override this to help reproducing a failure on a machine that has a different // core count than the one where the test originally failed: try { String value = System.getProperty(DEFAULT_SPINS_PROPERTY); if (value != null) { spins = Boolean.parseBoolean(value); } } catch (Exception ignored) { // that's fine we might hit a SecurityException etc. here just continue } setDefaultMaxMergesAndThreads(spins); if (verbose()) { message("initDynamicDefaults spins=" + spins + " maxThreadCount=" + maxThreadCount + " maxMergeCount=" + maxMergeCount); } } } private static String rateToString(double mbPerSec) { if (mbPerSec == 0.0) { return "stopped"; } else if (mbPerSec == Double.POSITIVE_INFINITY) { return "unlimited"; } else { return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec); } } @Override public void close() { sync(); }
Wait for any running merge threads to finish. This call is not interruptible as used by close().
/** Wait for any running merge threads to finish. This call is not interruptible as used by {@link #close()}. */
public void sync() { boolean interrupted = false; try { while (true) { MergeThread toSync = null; synchronized (this) { for (MergeThread t : mergeThreads) { // In case a merge thread is calling us, don't try to sync on // itself, since that will never finish! if (t.isAlive() && t != Thread.currentThread()) { toSync = t; break; } } } if (toSync != null) { try { toSync.join(); } catch (InterruptedException ie) { // ignore this Exception, we will retry until all threads are dead interrupted = true; } } else { break; } } } finally { // finally, restore interrupt status: if (interrupted) Thread.currentThread().interrupt(); } }
Returns the number of merge threads that are alive, ignoring the calling thread if it is a merge thread. Note that this number is ≤ mergeThreads size.
@lucene.internal
/** * Returns the number of merge threads that are alive, ignoring the calling thread * if it is a merge thread. Note that this number is &le; {@link #mergeThreads} size. * * @lucene.internal */
public synchronized int mergeThreadCount() { Thread currentThread = Thread.currentThread(); int count = 0; for (MergeThread mergeThread : mergeThreads) { if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.isAborted() == false) { count++; } } return count; } @Override public synchronized void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { assert !Thread.holdsLock(writer); initDynamicDefaults(writer); if (trigger == MergeTrigger.CLOSING) { // Disable throttling on close: targetMBPerSec = MAX_MERGE_MB_PER_SEC; updateMergeThreads(); } // First, quickly run through the newly proposed merges // and add any orthogonal merges (ie a merge not // involving segments already pending to be merged) to // the queue. If we are way behind on merging, many of // these newly proposed merges will likely already be // registered. if (verbose()) { message("now merge"); message(" index: " + writer.segString()); } // Iterate, pulling from the IndexWriter's queue of // pending merges, until it's empty: while (true) { if (maybeStall(writer) == false) { break; } OneMerge merge = writer.getNextMerge(); if (merge == null) { if (verbose()) { message(" no more merges pending; now return"); } return; } boolean success = false; try { if (verbose()) { message(" consider merge " + writer.segString(merge.segments)); } // OK to spawn a new merge thread to handle this // merge: final MergeThread newMergeThread = getMergeThread(writer, merge); mergeThreads.add(newMergeThread); updateIOThrottle(newMergeThread.merge, newMergeThread.rateLimiter); if (verbose()) { message(" launch new thread [" + newMergeThread.getName() + "]"); } newMergeThread.start(); updateMergeThreads(); success = true; } finally { if (!success) { writer.mergeFinish(merge); } } } }
This is invoked by merge to possibly stall the incoming thread when there are too many merges running or pending. The default behavior is to force this thread, which is producing too many segments for merging to keep up, to wait until merges catch up. Applications that can take other less drastic measures, such as limiting how many threads are allowed to index, can do nothing here and throttle elsewhere. If this method wants to stall but the calling thread is a merge thread, it should return false to tell caller not to kick off any new merges.
/** This is invoked by {@link #merge} to possibly stall the incoming * thread when there are too many merges running or pending. The * default behavior is to force this thread, which is producing too * many segments for merging to keep up, to wait until merges catch * up. Applications that can take other less drastic measures, such * as limiting how many threads are allowed to index, can do nothing * here and throttle elsewhere. * * If this method wants to stall but the calling thread is a merge * thread, it should return false to tell caller not to kick off * any new merges. */
protected synchronized boolean maybeStall(IndexWriter writer) { long startStallTime = 0; while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) { // This means merging has fallen too far behind: we // have already created maxMergeCount threads, and // now there's at least one more merge pending. // Note that only maxThreadCount of // those created merge threads will actually be // running; the rest will be paused (see // updateMergeThreads). We stall this producer // thread to prevent creation of new segments, // until merging has caught up: if (mergeThreads.contains(Thread.currentThread())) { // Never stall a merge thread since this blocks the thread from // finishing and calling updateMergeThreads, and blocking it // accomplishes nothing anyway (it's not really a segment producer): return false; } if (verbose() && startStallTime == 0) { message(" too many merges; stalling..."); } startStallTime = System.currentTimeMillis(); doStall(); } if (verbose() && startStallTime != 0) { message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec"); } return true; }
Called from maybeStall to pause the calling thread for a bit.
/** Called from {@link #maybeStall} to pause the calling thread for a bit. */
protected synchronized void doStall() { try { // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere: wait(250); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } }
Does the actual merge, by calling IndexWriter.merge
/** Does the actual merge, by calling {@link IndexWriter#merge} */
protected void doMerge(IndexWriter writer, OneMerge merge) throws IOException { writer.merge(merge); }
Create and return a new MergeThread
/** Create and return a new MergeThread */
protected synchronized MergeThread getMergeThread(IndexWriter writer, OneMerge merge) throws IOException { final MergeThread thread = new MergeThread(writer, merge); thread.setDaemon(true); thread.setName("Lucene Merge Thread #" + mergeThreadCount++); return thread; }
Runs a merge thread to execute a single merge, then exits.
/** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> { final IndexWriter writer; final OneMerge merge; final MergeRateLimiter rateLimiter;
Sole constructor.
/** Sole constructor. */
public MergeThread(IndexWriter writer, OneMerge merge) { this.writer = writer; this.merge = merge; this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); } @Override public int compareTo(MergeThread other) { // Larger merges sort first: return Long.compare(other.merge.estimatedMergeBytes, merge.estimatedMergeBytes); } @Override public void run() { try { if (verbose()) { message(" merge thread: start"); } doMerge(writer, merge); if (verbose()) { message(" merge thread: done"); } // Let CMS run new merges if necessary: try { merge(writer, MergeTrigger.MERGE_FINISHED, true); } catch (AlreadyClosedException ace) { // OK } catch (IOException ioe) { throw new RuntimeException(ioe); } } catch (Throwable exc) { if (exc instanceof MergePolicy.MergeAbortedException) { // OK to ignore } else if (suppressExceptions == false) { // suppressExceptions is normally only set during // testing. handleMergeException(writer.getDirectory(), exc); } } finally { synchronized(ConcurrentMergeScheduler.this) { removeMergeThread(); updateMergeThreads(); // In case we had stalled indexing, we can now wake up // and possibly unstall: ConcurrentMergeScheduler.this.notifyAll(); } } } }
Called when an exception is hit in a background merge thread
/** Called when an exception is hit in a background merge * thread */
protected void handleMergeException(Directory dir, Throwable exc) { throw new MergePolicy.MergeException(exc, dir); } private boolean suppressExceptions;
Used for testing
/** Used for testing */
void setSuppressExceptions() { if (verbose()) { message("will suppress merge exceptions"); } suppressExceptions = true; }
Used for testing
/** Used for testing */
void clearSuppressExceptions() { if (verbose()) { message("will not suppress merge exceptions"); } suppressExceptions = false; } @Override public String toString() { StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); sb.append("maxThreadCount=").append(maxThreadCount).append(", "); sb.append("maxMergeCount=").append(maxMergeCount).append(", "); sb.append("ioThrottle=").append(doAutoIOThrottle); return sb.toString(); } private boolean isBacklog(long now, OneMerge merge) { double mergeMB = bytesToMB(merge.estimatedMergeBytes); for (MergeThread mergeThread : mergeThreads) { long mergeStartNS = mergeThread.merge.mergeStartNS; if (mergeThread.isAlive() && mergeThread.merge != merge && mergeStartNS != -1 && mergeThread.merge.estimatedMergeBytes >= MIN_BIG_MERGE_MB*1024*1024 && nsToSec(now-mergeStartNS) > 3.0) { double otherMergeMB = bytesToMB(mergeThread.merge.estimatedMergeBytes); double ratio = otherMergeMB / mergeMB; if (ratio > 0.3 && ratio < 3.0) { return true; } } } return false; }
Tunes IO throttle when a new merge starts.
/** Tunes IO throttle when a new merge starts. */
private synchronized void updateIOThrottle(OneMerge newMerge, MergeRateLimiter rateLimiter) throws IOException { if (doAutoIOThrottle == false) { return; } double mergeMB = bytesToMB(newMerge.estimatedMergeBytes); if (mergeMB < MIN_BIG_MERGE_MB) { // Only watch non-trivial merges for throttling; this is safe because the MP must eventually // have to do larger merges: return; } long now = System.nanoTime(); // Simplistic closed-loop feedback control: if we find any other similarly // sized merges running, then we are falling behind, so we bump up the // IO throttle, else we lower it: boolean newBacklog = isBacklog(now, newMerge); boolean curBacklog = false; if (newBacklog == false) { if (mergeThreads.size() > maxThreadCount) { // If there are already more than the maximum merge threads allowed, count that as backlog: curBacklog = true; } else { // Now see if any still-running merges are backlog'd: for (MergeThread mergeThread : mergeThreads) { if (isBacklog(now, mergeThread.merge)) { curBacklog = true; break; } } } } double curMBPerSec = targetMBPerSec; if (newBacklog) { // This new merge adds to the backlog: increase IO throttle by 20% targetMBPerSec *= 1.20; if (targetMBPerSec > MAX_MERGE_MB_PER_SEC) { targetMBPerSec = MAX_MERGE_MB_PER_SEC; } if (verbose()) { if (curMBPerSec == targetMBPerSec) { message(String.format(Locale.ROOT, "io throttle: new merge backlog; leave IO rate at ceiling %.1f MB/sec", targetMBPerSec)); } else { message(String.format(Locale.ROOT, "io throttle: new merge backlog; increase IO rate to %.1f MB/sec", targetMBPerSec)); } } } else if (curBacklog) { // We still have an existing backlog; leave the rate as is: if (verbose()) { message(String.format(Locale.ROOT, "io throttle: current merge backlog; leave IO rate at %.1f MB/sec", targetMBPerSec)); } } else { // We are not falling behind: decrease IO throttle by 10% targetMBPerSec /= 1.10; if (targetMBPerSec < MIN_MERGE_MB_PER_SEC) { targetMBPerSec = MIN_MERGE_MB_PER_SEC; } if (verbose()) { if (curMBPerSec == targetMBPerSec) { message(String.format(Locale.ROOT, "io throttle: no merge backlog; leave IO rate at floor %.1f MB/sec", targetMBPerSec)); } else { message(String.format(Locale.ROOT, "io throttle: no merge backlog; decrease IO rate to %.1f MB/sec", targetMBPerSec)); } } } double rate; if (newMerge.maxNumSegments != -1) { rate = forceMergeMBPerSec; } else { rate = targetMBPerSec; } rateLimiter.setMBPerSec(rate); targetMBPerSecChanged(); }
Subclass can override to tweak targetMBPerSec.
/** Subclass can override to tweak targetMBPerSec. */
protected void targetMBPerSecChanged() { } private static double nsToSec(long ns) { return ns / 1000000000.0; } private static double bytesToMB(long bytes) { return bytes/1024./1024.; } }