Copyright (c) 2003, 2019 IBM Corporation and others. This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which accompanies this distribution, and is available at https://www.eclipse.org/legal/epl-2.0/ SPDX-License-Identifier: EPL-2.0 Contributors: IBM Corporation - initial API and implementation Stephan Wahlbrink - Fix for bug 200997. Danail Nachev - Fix for bug 109898 Mike Moreaty - Fix for bug 289790 Oracle Corporation - Fix for bug 316839 Thirumala Reddy Mutchukota (thirumala@google.com) - Bug 432049, JobGroup API and implementation Bug 105821, Support for Job#join with timeout and progress monitor Jan Koehnlein - Fix for bug 60964 (454698) Terry Parker - Bug 457504, Publish a job group's final status to IJobChangeListeners Xored Software Inc - Fix for bug 550738
/******************************************************************************* * Copyright (c) 2003, 2019 IBM Corporation and others. * * This program and the accompanying materials * are made available under the terms of the Eclipse Public License 2.0 * which accompanies this distribution, and is available at * https://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 * * Contributors: * IBM Corporation - initial API and implementation * Stephan Wahlbrink - Fix for bug 200997. * Danail Nachev - Fix for bug 109898 * Mike Moreaty - Fix for bug 289790 * Oracle Corporation - Fix for bug 316839 * Thirumala Reddy Mutchukota (thirumala@google.com) - * Bug 432049, JobGroup API and implementation * Bug 105821, Support for Job#join with timeout and progress monitor * Jan Koehnlein - Fix for bug 60964 (454698) * Terry Parker - Bug 457504, Publish a job group's final status to IJobChangeListeners * Xored Software Inc - Fix for bug 550738 *******************************************************************************/
package org.eclipse.core.internal.jobs; import java.util.*; import org.eclipse.core.internal.runtime.RuntimeLog; import org.eclipse.core.runtime.*; import org.eclipse.core.runtime.jobs.*; import org.eclipse.osgi.service.debug.*; import org.eclipse.osgi.util.NLS;
Implementation of API type IJobManager Implementation note: all the data structures of this class are protected by a single lock object held as a private field in this class. The JobManager instance itself is not used because this class is publicly reachable, and third party clients may try to synchronize on it. There are various locks used and held throughout the JobManager implementation. When multiple locks interact, circular hold and waits must never happen, or a deadlock will occur. To prevent deadlocks, this is the order that locks must be acquired. WorkerPool -> JobManager.implicitJobs -> JobManager.lock -> InternalJob.jobStateLock or InternalJobGroup.jobGroupStateLock
@ThreadSafe
/** * Implementation of API type IJobManager * * Implementation note: all the data structures of this class are protected by a * single lock object held as a private field in this class. The JobManager * instance itself is not used because this class is publicly reachable, and * third party clients may try to synchronize on it. * * There are various locks used and held throughout the JobManager * implementation. When multiple locks interact, circular hold and waits must * never happen, or a deadlock will occur. To prevent deadlocks, this is the * order that locks must be acquired. * * WorkerPool -> JobManager.implicitJobs -> JobManager.lock -> * InternalJob.jobStateLock or InternalJobGroup.jobGroupStateLock * * @ThreadSafe */
public class JobManager implements IJobManager, DebugOptionsListener {
The unique identifier constant of this plug-in.
/** * The unique identifier constant of this plug-in. */
public static final String PI_JOBS = "org.eclipse.core.jobs"; //$NON-NLS-1$
Status code constant indicating an error occurred while running a plug-in. For backward compatibility with Platform.PLUGIN_ERROR left at (value = 2).
/** * Status code constant indicating an error occurred while running a plug-in. * For backward compatibility with Platform.PLUGIN_ERROR left at (value = 2). */
public static final int PLUGIN_ERROR = 2;
Determines how often the progress monitor is checked for cancellation during the join call.
/** * Determines how often the progress monitor is checked for cancellation during the join call. */
private static final long MAX_WAIT_INTERVAL = 100; private static final String OPTION_DEADLOCK_ERROR = PI_JOBS + "/jobs/errorondeadlock"; //$NON-NLS-1$ private static final String OPTION_DEBUG_BEGIN_END = PI_JOBS + "/jobs/beginend"; //$NON-NLS-1$ private static final String OPTION_DEBUG_YIELDING = PI_JOBS + "/jobs/yielding"; //$NON-NLS-1$ private static final String OPTION_DEBUG_YIELDING_DETAILED = PI_JOBS + "/jobs/yielding/detailed"; //$NON-NLS-1$ private static final String OPTION_DEBUG_JOBS = PI_JOBS + "/jobs"; //$NON-NLS-1$ private static final String OPTION_LOCKS = PI_JOBS + "/jobs/locks"; //$NON-NLS-1$ private static final String OPTION_SHUTDOWN = PI_JOBS + "/jobs/shutdown"; //$NON-NLS-1$ static DebugTrace DEBUG_TRACE; static boolean DEBUG = false; static boolean DEBUG_BEGIN_END = false; static boolean DEBUG_YIELDING = false; static boolean DEBUG_YIELDING_DETAILED = false; static boolean DEBUG_DEADLOCK = false; static boolean DEBUG_LOCKS = false; static boolean DEBUG_SHUTDOWN = false;
The singleton job manager instance. It must be a singleton because all job instances maintain a reference (as an optimization) and have no way of updating it.
/** * The singleton job manager instance. It must be a singleton because * all job instances maintain a reference (as an optimization) and have no way * of updating it. */
private static JobManager instance;
Scheduling rule used for validation of client-defined rules.
/** * Scheduling rule used for validation of client-defined rules. */
private static final ISchedulingRule nullRule = new ISchedulingRule() { @Override public boolean contains(ISchedulingRule rule) { return rule == this; } @Override public boolean isConflicting(ISchedulingRule rule) { return rule == this; } };
True if this manager is active, and false otherwise. A job manager starts out active, and becomes inactive if it has been shutdown.
/** * True if this manager is active, and false otherwise. A job manager * starts out active, and becomes inactive if it has been shutdown. */
private volatile boolean active = true; final ImplicitJobs implicitJobs = new ImplicitJobs(this);
Listeners for the job lifecycle. It is important that the JobManager#JobGroupUpdater is the first one that is dispatched to, since it updates the JobChangeEvent#jobGroupStatus field, which other listeners may use.
/** * Listeners for the job lifecycle. It is important that the * JobManager#JobGroupUpdater is the first one that is dispatched to, since * it updates the JobChangeEvent#jobGroupStatus field, which other listeners * may use. */
private final JobListeners jobListeners = new JobListeners();
The lock for synchronizing all activity in the job manager. To avoid deadlock, this lock must never be held for extended periods, and must never be held while third party code is being called. "itself")
/** * The lock for synchronizing all activity in the job manager. To avoid deadlock, * this lock must never be held for extended periods, and must never be * held while third party code is being called. * @GuardedBy("itself") */
private final Object lock = new Object();
A job listener to check for the cancellation and completion of the job groups.
/** * A job listener to check for the cancellation and completion of the job groups. */
private final IJobChangeListener jobGroupUpdater = new JobGroupUpdater(lock); private final LockManager lockManager = new LockManager();
The pool of worker threads.
/** * The pool of worker threads. */
private WorkerPool pool;
"lock")
/** * @GuardedBy("lock") */
private ProgressProvider progressProvider = null;
Jobs that are currently running. Should only be modified from changeState "lock")
/** * Jobs that are currently running. Should only be modified from changeState * @GuardedBy("lock") */
private final HashSet<InternalJob> running;
Jobs that are currently yielding. Should only be modified from changeState "lock")
/** * Jobs that are currently yielding. Should only be modified from changeState * @GuardedBy("lock") */
private final HashSet<InternalJob> yielding;
Jobs that are sleeping. Some sleeping jobs are scheduled to wake up at a given start time, while others will sleep indefinitely until woken. Should only be modified from changeState "lock")
/** * Jobs that are sleeping. Some sleeping jobs are scheduled to wake * up at a given start time, while others will sleep indefinitely until woken. * Should only be modified from changeState * @GuardedBy("lock") */
private final JobQueue sleeping;
True if this manager has been suspended, and false otherwise. A job manager starts out not suspended, and becomes suspended when suspend is invoked. Once suspended, no jobs will start running until resume is called. "lock")
/** * True if this manager has been suspended, and false otherwise. A job manager * starts out not suspended, and becomes suspended when <code>suspend</code> * is invoked. Once suspended, no jobs will start running until <code>resume</code> * is called. * @GuardedBy("lock") */
private boolean suspended = false;
jobs that are waiting to be run. Should only be modified from changeState "lock")
/** * jobs that are waiting to be run. Should only be modified from changeState * @GuardedBy("lock") */
private final JobQueue waiting;
ThreadJobs that are waiting to be run. Should only be modified from changeState "lock")
/** * ThreadJobs that are waiting to be run. Should only be modified from changeState * @GuardedBy("lock") */
final JobQueue waitingThreadJobs;
Counter to record wait queue insertion order. "lock")
/** * Counter to record wait queue insertion order. * @GuardedBy("lock") */
Counter waitQueueCounter = new Counter();
A set of progress monitors we must track cancellation requests for. "itself")
/** * A set of progress monitors we must track cancellation requests for. * @GuardedBy("itself") */
final List<Object[]> monitorStack = new ArrayList<>(); private final InternalWorker internalWorker; public static void debug(String msg) { DEBUG_TRACE.trace(null, msg); }
Returns the job manager singleton. For internal use only.
/** * Returns the job manager singleton. For internal use only. */
static synchronized JobManager getInstance() { if (instance == null) new JobManager(); return instance; }
For debugging purposes only
/** * For debugging purposes only */
private static String printJobName(Job job) { if (job instanceof ThreadJob) { Job realJob = ((ThreadJob) job).realJob; if (realJob != null) return realJob.getClass().getName(); return "ThreadJob on rule: " + job.getRule(); //$NON-NLS-1$ } return job.getClass().getName(); }
For debugging purposes only
/** * For debugging purposes only */
public static String printState(Job job) { return printState(((InternalJob) job).internalGetState()); }
For debugging purposes only
/** * For debugging purposes only */
public static String printState(int state) { switch (state) { case Job.NONE : return "NONE"; //$NON-NLS-1$ case Job.WAITING : return "WAITING"; //$NON-NLS-1$ case Job.SLEEPING : return "SLEEPING"; //$NON-NLS-1$ case Job.RUNNING : return "RUNNING"; //$NON-NLS-1$ case InternalJob.BLOCKED : return "BLOCKED"; //$NON-NLS-1$ case InternalJob.YIELDING : return "YIELDING"; //$NON-NLS-1$ case InternalJob.ABOUT_TO_RUN : return "ABOUT_TO_RUN"; //$NON-NLS-1$ case InternalJob.ABOUT_TO_SCHEDULE : return "ABOUT_TO_SCHEDULE";//$NON-NLS-1$ } return "UNKNOWN"; //$NON-NLS-1$ }
Note that although this method is not API, clients have historically used it to force jobs shutdown in cases where OSGi shutdown does not occur. For this reason, this method should be considered near-API and should not be changed if at all possible.
/** * Note that although this method is not API, clients have historically used * it to force jobs shutdown in cases where OSGi shutdown does not occur. * For this reason, this method should be considered near-API and should not * be changed if at all possible. */
public static void shutdown() { if (instance != null) { instance.doShutdown(); instance = null; } } private JobManager() { instance = this; synchronized (lock) { waiting = new JobQueue(false); waitingThreadJobs = new JobQueue(false, false); sleeping = new JobQueue(true); running = new HashSet<>(10); yielding = new HashSet<>(10); pool = new WorkerPool(this); } pool.setDaemon(JobOSGiUtils.getDefault().useDaemonThreads()); internalWorker = new InternalWorker(this); internalWorker.setDaemon(JobOSGiUtils.getDefault().useDaemonThreads()); internalWorker.start(); jobListeners.add(jobGroupUpdater); } @Override public void addJobChangeListener(IJobChangeListener listener) { jobListeners.add(listener); } @Override public void beginRule(ISchedulingRule rule, IProgressMonitor monitor) { validateRule(rule); implicitJobs.begin(rule, monitorFor(monitor), false); }
Cancels a job
/** * Cancels a job */
protected boolean cancel(InternalJob job) { IProgressMonitor monitor = null; boolean runCanceling = false; synchronized (lock) { // signal that the job should be canceled before it gets a chance to run job.setAboutToRunCanceled(true); switch (job.getState()) { case Job.NONE : return true; case Job.RUNNING : //cannot cancel a job that has already started (as opposed to ABOUT_TO_RUN) if (job.internalGetState() == Job.RUNNING) { monitor = job.getProgressMonitor(); runCanceling = !job.isRunCanceled(); if (runCanceling) job.setRunCanceled(true); break; } return false; default : changeState(job, Job.NONE); } } //call monitor and canceling outside sync block if (monitor != null) { if (runCanceling) { if (!monitor.isCanceled()) monitor.setCanceled(true); job.canceling(); } return false; } //only notify listeners if the job was waiting or sleeping jobListeners.done((Job) job, Status.CANCEL_STATUS, false); return true; } @Override public void cancel(Object family) { //don't synchronize because cancel calls listeners for (InternalJob internalJob : select(family)) cancel(internalJob); } void cancel(InternalJobGroup jobGroup) { cancel(jobGroup, false); } void cancel(InternalJobGroup jobGroup, boolean cancelDueToError) { Assert.isLegal(jobGroup != null, "jobGroup should not be null"); //$NON-NLS-1$ synchronized (lock) { switch (jobGroup.getState()) { case JobGroup.NONE : return; case JobGroup.CANCELING : if (!cancelDueToError) { // User cancellation takes precedence over the cancel due to error. jobGroup.updateCancelingReason(cancelDueToError); } return; default : jobGroup.cancelAndNotify(cancelDueToError); } } }
Atomically updates the state of a job, adding or removing from the necessary queues or sets.
/** * Atomically updates the state of a job, adding or removing from the * necessary queues or sets. */
private void changeState(InternalJob job, int newState) { boolean blockedJobs = false; synchronized (lock) { int oldJobState; synchronized (job.jobStateLock) { job.jobStateLock.notifyAll(); oldJobState = job.getState(); int oldState = job.internalGetState(); switch (oldState) { case InternalJob.YIELDING : yielding.remove(job); case Job.NONE : case InternalJob.ABOUT_TO_SCHEDULE : break; case InternalJob.BLOCKED : //remove this job from the linked list of blocked jobs job.remove(); break; case Job.WAITING : try { waiting.remove(job); } catch (RuntimeException e) { Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$ } break; case Job.SLEEPING : try { sleeping.remove(job); } catch (RuntimeException e) { Assert.isLegal(false, "Tried to remove a job that wasn't in the queue"); //$NON-NLS-1$ } break; case Job.RUNNING : case InternalJob.ABOUT_TO_RUN : running.remove(job); //add any blocked jobs back to the wait queue InternalJob blocked = job.previous(); job.remove(); blockedJobs = blocked != null; while (blocked != null) { InternalJob previous = blocked.previous(); changeState(blocked, Job.WAITING); blocked = previous; } break; default : Assert.isLegal(false, "Invalid job state: " + job + ", state: " + oldState); //$NON-NLS-1$ //$NON-NLS-2$ } job.internalSetState(newState); switch (newState) { case Job.NONE : job.setStartTime(InternalJob.T_NONE); job.setWaitQueueStamp(InternalJob.T_NONE); job.setRunCanceled(false); case InternalJob.BLOCKED : break; case Job.WAITING : waiting.enqueue(job); break; case Job.SLEEPING : try { sleeping.enqueue(job); } catch (RuntimeException e) { throw new RuntimeException("Error changing from state: " + oldState); //$NON-NLS-1$ } break; case Job.RUNNING : case InternalJob.ABOUT_TO_RUN : // These flags must be reset in all cases, including resuming from yield job.setStartTime(InternalJob.T_NONE); job.setWaitQueueStamp(InternalJob.T_NONE); running.add(job); break; case InternalJob.YIELDING : yielding.add(job); case InternalJob.ABOUT_TO_SCHEDULE : break; default : Assert.isLegal(false, "Invalid job state: " + job + ", state: " + newState); //$NON-NLS-1$ //$NON-NLS-2$ } } InternalJobGroup jobGroup = job.getJobGroup(); if (jobGroup != null) { jobGroup.jobStateChanged(job, oldJobState, job.getState()); } } //notify queue outside sync block if (blockedJobs) pool.jobQueued(); }
Returns a new progress monitor for this job, belonging to the given progress group. Returns null if it is not a valid time to set the job's group.
/** * Returns a new progress monitor for this job, belonging to the given * progress group. Returns null if it is not a valid time to set the job's group. */
protected IProgressMonitor createMonitor(InternalJob job, IProgressMonitor group, int ticks) { synchronized (lock) { //group must be set before the job is scheduled //this includes the ABOUT_TO_SCHEDULE state, during which it is still //valid to set the progress monitor if (job.getState() != Job.NONE) return null; IProgressMonitor monitor = null; if (progressProvider != null) monitor = progressProvider.createMonitor((Job) job, group, ticks); if (monitor == null) monitor = new NullProgressMonitor(); return monitor; } }
Returns a new progress monitor for this job. Never returns null. "lock")
/** * Returns a new progress monitor for this job. Never returns null. * @GuardedBy("lock") */
private IProgressMonitor createMonitor(Job job) { IProgressMonitor monitor = null; if (progressProvider != null) monitor = progressProvider.createMonitor(job); if (monitor == null) monitor = new NullProgressMonitor(); return monitor; } @Override public IProgressMonitor createProgressGroup() { if (progressProvider != null) return progressProvider.createProgressGroup(); return new NullProgressMonitor(); } @Override public Job currentJob() { Thread current = Thread.currentThread(); if (current instanceof Worker) return ((Worker) current).currentJob(); synchronized (lock) { for (InternalJob internalJob : running) { Job job = (Job) internalJob; if (job.getThread() == current) return job; } } return null; } @Override public ISchedulingRule currentRule() { //check thread job first, because actual current job may have null rule Job currentJob = implicitJobs.getThreadJob(Thread.currentThread()); if (currentJob != null) return currentJob.getRule(); currentJob = currentJob(); if (currentJob != null) return currentJob.getRule(); return null; }
Returns the delay in milliseconds that a job with a given priority can tolerate waiting.
/** * Returns the delay in milliseconds that a job with a given priority can * tolerate waiting. */
private long delayFor(int priority) { //these values may need to be tweaked based on machine speed switch (priority) { case Job.INTERACTIVE : return 0L; case Job.SHORT : return 50L; case Job.LONG : return 100L; case Job.BUILD : return 500L; case Job.DECORATE : return 1000L; default : Assert.isTrue(false, "Job has invalid priority: " + priority); //$NON-NLS-1$ return 0; } }
Performs the scheduling of a job.
Returns:true on success, false if cancelled, or scheduled by another thread
/** * Performs the scheduling of a job. * * @return true on success, false if cancelled, or scheduled by another thread */
private boolean doSchedule(InternalJob job, long delay) { boolean cancelling = false; synchronized (lock) { //job may have been canceled already int state = job.internalGetState(); if (state != InternalJob.ABOUT_TO_SCHEDULE && state != Job.SLEEPING) return false; if (job.isAboutToRunCanceled()) { cancelling = true; job.setResult(Status.CANCEL_STATUS); job.setProgressMonitor(null); job.setThread(null); changeState(job, Job.NONE); } else { // if it's a decoration job with no rule, don't run it right now if the system // is busy if (job.getPriority() == Job.DECORATE && job.getRule() == null) { long minDelay = running.size() * 100; delay = Math.max(delay, minDelay); } if (delay > 0) { job.setStartTime(System.currentTimeMillis() + delay); changeState(job, Job.SLEEPING); } else { job.setStartTime(System.currentTimeMillis() + delayFor(job.getPriority())); job.setWaitQueueStamp(waitQueueCounter.increment()); changeState(job, Job.WAITING); } } } if (cancelling) jobListeners.done((Job) job, Status.CANCEL_STATUS, false); return !cancelling; }
Shuts down the job manager. Currently running jobs will be told to stop, but worker threads may still continue processing. (note: This implemented IJobManager.shutdown which was removed due to problems caused by premature shutdown)
/** * Shuts down the job manager. Currently running jobs will be told * to stop, but worker threads may still continue processing. * (note: This implemented IJobManager.shutdown which was removed * due to problems caused by premature shutdown) */
private void doShutdown() { Job[] toCancel = null; synchronized (lock) { if (!active) return; active = false; //cancel all running jobs toCancel = running.toArray(new Job[running.size()]); //discard any jobs that have not yet started running sleeping.clear(); waiting.clear(); } // Give running jobs a chance to finish. Wait 0.1 seconds for up to 3 times. if (toCancel != null && toCancel.length > 0) { for (Job element : toCancel) { cancel(element); // cancel jobs outside sync block to avoid deadlock } for (int waitAttempts = 0; waitAttempts < 3; waitAttempts++) { Thread.yield(); synchronized (lock) { if (running.isEmpty()) break; } if (DEBUG_SHUTDOWN) { JobManager.debug("Shutdown - job wait cycle #" + (waitAttempts + 1)); //$NON-NLS-1$ Job[] stillRunning = null; synchronized (lock) { stillRunning = running.toArray(new Job[running.size()]); } if (stillRunning != null) { for (Job element : stillRunning) { JobManager.debug("\tJob: " + printJobName(element)); //$NON-NLS-1$ } } } try { Thread.sleep(100); } catch (InterruptedException e) { //ignore } Thread.yield(); } synchronized (lock) { // retrieve list of the jobs that are still running toCancel = running.toArray(new Job[running.size()]); } } internalWorker.cancel(); if (toCancel != null) { for (Job element : toCancel) { String jobName = printJobName(element); //this doesn't need to be translated because it's just being logged String msg = "Job found still running after platform shutdown. Jobs should be canceled by the plugin that scheduled them during shutdown: " + jobName; //$NON-NLS-1$ RuntimeLog.log(new Status(IStatus.WARNING, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, null)); // TODO the RuntimeLog.log in its current implementation won't produce a log // during this stage of shutdown. For now add a standard error output. // One the logging story is improved, the System.err output below can be removed: System.err.println(msg); } } synchronized (lock) { //discard reference to any jobs still running at this point running.clear(); } pool.shutdown(); jobListeners.remove(jobGroupUpdater); }
Indicates that a job was running, and has now finished. Note that this method can be called under OutOfMemoryError conditions and thus must be paranoid about allocating objects.
/** * Indicates that a job was running, and has now finished. Note that this method * can be called under OutOfMemoryError conditions and thus must be paranoid * about allocating objects. */
protected void endJob(InternalJob job, IStatus result, boolean notify) { long rescheduleDelay = InternalJob.T_NONE; synchronized (lock) { //if the job is finishing asynchronously, there is nothing more to do for now if (result == Job.ASYNC_FINISH) return; //if job is not known then it cannot be done if (job.getState() == Job.NONE) return; if (JobManager.DEBUG && notify) JobManager.debug("Ending job: " + job); //$NON-NLS-1$ job.setResult(result); job.setProgressMonitor(null); job.setThread(null); rescheduleDelay = job.getStartTime(); changeState(job, Job.NONE); } //notify listeners outside sync block final boolean reschedule = active && rescheduleDelay > InternalJob.T_NONE && job.shouldSchedule(); if (notify) jobListeners.done((Job) job, result, reschedule); //reschedule the job if requested and we are still active if (reschedule) schedule(job, rescheduleDelay, reschedule); //log result if it is warning or error. When the job belongs to a job group defer the logging //until the whole group is completed (see JobManager#updateJobGroup). if (job.getJobGroup() == null && result.matches(IStatus.ERROR | IStatus.WARNING)) RuntimeLog.log(result); } @Override public void endRule(ISchedulingRule rule) { implicitJobs.end(rule, false); } @Override public Job[] find(Object family) { List<InternalJob> members = select(family); return members.toArray(new Job[members.size()]); } List<Job> find(InternalJobGroup jobGroup) { Assert.isLegal(jobGroup != null, "jobGroup should not be null"); //$NON-NLS-1$ synchronized (lock) { return jobGroup.internalGetActiveJobs(); } }
Returns a running or blocked job whose scheduling rule conflicts with the scheduling rule of the given waiting job. Returns null if there are no conflicting jobs. A job can only run if there are no running jobs and no blocked jobs whose scheduling rule conflicts with its rule.
/** * Returns a running or blocked job whose scheduling rule conflicts with the * scheduling rule of the given waiting job. Returns null if there are no * conflicting jobs. A job can only run if there are no running jobs and no blocked * jobs whose scheduling rule conflicts with its rule. */
protected InternalJob findBlockingJob(InternalJob waitingJob) { if (waitingJob.getRule() == null) return null; synchronized (lock) { if (running.isEmpty()) return null; //check the running jobs boolean hasBlockedJobs = false; for (InternalJob job : running) { if (waitingJob.isConflicting(job)) return job; if (!hasBlockedJobs) hasBlockedJobs = job.previous() != null; } //there are no blocked jobs, so we are done if (!hasBlockedJobs) return null; //check all jobs blocked by running jobs for (InternalJob job : running) { while (true) { job = job.previous(); if (job == null) break; if (waitingJob.isConflicting(job)) return job; } } } return null; }
Returns a job from the given collection whose scheduling rule conflicts with the scheduling rule of the given job. Returns null if there are no conflicting jobs.
/** * Returns a job from the given collection whose scheduling rule conflicts * with the scheduling rule of the given job. Returns null if there are no * conflicting jobs. */
InternalJob findBlockedJob(InternalJob job, Iterator jobs) { synchronized (lock) { while (jobs.hasNext()) { InternalJob waitingJob = (InternalJob) jobs.next(); if (waitingJob.isConflicting(job)) return waitingJob; } return null; } } void dequeue(JobQueue queue, InternalJob job) { synchronized (lock) { queue.remove(job); } } void enqueue(JobQueue queue, InternalJob job) { synchronized (lock) { queue.enqueue(job); } } public LockManager getLockManager() { return lockManager; }
Returns a translated message indicating we are waiting for the given number of jobs to complete.
/** * Returns a translated message indicating we are waiting for the given * number of jobs to complete. */
private String getWaitMessage(int jobCount) { String message = jobCount == 1 ? JobMessages.jobs_waitFamSubOne : JobMessages.jobs_waitFamSub; return NLS.bind(message, Integer.toString(jobCount)); }
Returns whether the job manager is active (has not been shutdown).
/** * Returns whether the job manager is active (has not been shutdown). */
protected boolean isActive() { return active; }
Returns true if the given job is blocking the execution of a non-system job.
/** * Returns true if the given job is blocking the execution of a non-system * job. */
protected boolean isBlocking(InternalJob runningJob) { synchronized (lock) { // if this job isn't running, it can't be blocking anyone if (runningJob.getState() != Job.RUNNING) return false; // if any job is queued behind this one, it is blocked by it InternalJob previous = runningJob.previous(); while (previous != null) { // ignore jobs of lower priority (higher priority value means lower priority) if (previous.getPriority() < runningJob.getPriority()) { if (!previous.isSystem()) return true; // implicit jobs should interrupt unless they act on behalf of system jobs if (previous instanceof ThreadJob && ((ThreadJob) previous).shouldInterrupt()) return true; } previous = previous.previous(); } // consider threads waiting on IJobManager#beginRule for (Iterator i = waitingThreadJobs.iterator(); i.hasNext();) { ThreadJob waitingJob = (ThreadJob) i.next(); if (runningJob.isConflicting(waitingJob) && waitingJob.shouldInterrupt()) return true; } // none found return false; } } @Override public boolean isIdle() { synchronized (lock) { return running.isEmpty() && waiting.isEmpty(); } } @Override public boolean isSuspended() { synchronized (lock) { return suspended; } } protected boolean join(InternalJob job, long timeout, IProgressMonitor monitor) throws InterruptedException { Assert.isLegal(timeout >= 0, "timeout should not be negative"); //$NON-NLS-1$ long deadline = timeout == 0 ? 0 : System.currentTimeMillis() + timeout; Job currentJob = currentJob(); if (currentJob != null) { JobGroup jobGroup = currentJob.getJobGroup(); if (timeout == 0 && jobGroup != null && jobGroup.getMaxThreads() != 0 && jobGroup == job.getJobGroup()) throw new IllegalStateException("Joining on a job belonging to the same group is not allowed"); //$NON-NLS-1$ } final IJobChangeListener listener; final Semaphore barrier; synchronized (lock) { int state = job.getState(); if (state == Job.NONE) return true; //don't join a waiting or sleeping job when suspended (deadlock risk) if (suspended && state != Job.RUNNING) return true; //it's an error for a job to join itself if (state == Job.RUNNING && job.getThread() == Thread.currentThread()) throw new IllegalStateException("Job attempted to join itself"); //$NON-NLS-1$ //the semaphore will be released when the job is done barrier = new Semaphore(null); listener = new JobChangeAdapter() { @Override public void done(IJobChangeEvent event) { barrier.release(); } }; job.addJobChangeListener(listener); } //wait until listener notifies this thread. try { boolean canBlock = lockManager.canBlock(); while (true) { if (monitor != null && monitor.isCanceled()) throw new OperationCanceledException(); long remainingTime = deadline; if (deadline != 0) { remainingTime -= System.currentTimeMillis(); if (remainingTime <= 0) { return false; } } //notify hook to service pending syncExecs before falling asleep lockManager.aboutToWait(job.getThread()); try { // If remaining time is greater than MAX_WAIT_INTERVAL, sleep only for // MAX_WAIT_INTERVAL instead to be more responsive to monitor cancellation. long sleepTime = remainingTime != 0 && remainingTime <= MAX_WAIT_INTERVAL ? remainingTime : MAX_WAIT_INTERVAL; if (barrier.acquire(sleepTime)) break; } catch (InterruptedException e) { // if non-UI thread, re-throw the exception if (canBlock) throw e; // if UI thread, loop and keep trying } } } finally { lockManager.aboutToRelease(); job.removeJobChangeListener(listener); } return true; } @Override public void join(final Object family, IProgressMonitor monitor) throws InterruptedException, OperationCanceledException { monitor = monitorFor(monitor); IJobChangeListener listener = null; final Set<InternalJob> jobs; int jobCount; Job blocking = null; synchronized (lock) { //don't join a waiting or sleeping job when suspended (deadlock risk) int states = suspended ? Job.RUNNING : Job.RUNNING | Job.WAITING | Job.SLEEPING; jobs = Collections.synchronizedSet(new HashSet<>(select(family, states))); jobCount = jobs.size(); if (jobCount > 0) { //if there is only one blocking job, use it in the blockage callback below if (jobCount == 1) blocking = (Job) jobs.iterator().next(); listener = new JobChangeAdapter() { @Override public void done(IJobChangeEvent event) { //don't remove from list if job is being rescheduled if (!((JobChangeEvent) event).reschedule) jobs.remove(event.getJob()); } //update the list of jobs if new ones are started during the join @Override public void running(IJobChangeEvent event) { Job job = event.getJob(); if (family == null || job.belongsTo(family)) jobs.add(job); } //update the list of jobs if new ones are scheduled during the join @Override public void scheduled(IJobChangeEvent event) { //don't add to list if job is being rescheduled if (((JobChangeEvent) event).reschedule) return; //if job manager is suspended we only wait for running jobs if (isSuspended()) return; Job job = event.getJob(); if (family == null || job.belongsTo(family)) jobs.add(job); } }; addJobChangeListener(listener); } } if (jobCount == 0) { //use up the monitor outside synchronized block because monitors call untrusted code monitor.beginTask(JobMessages.jobs_blocked0, 1); monitor.done(); return; } //spin until all jobs are completed try { monitor.beginTask(JobMessages.jobs_blocked0, jobCount); monitor.subTask(getWaitMessage(jobCount)); reportBlocked(monitor, blocking); int jobsLeft; int reportedWorkDone = 0; while ((jobsLeft = jobs.size()) > 0) { //don't let there be negative work done if new jobs have //been added since the join began int actualWorkDone = Math.max(0, jobCount - jobsLeft); if (reportedWorkDone < actualWorkDone) { monitor.worked(actualWorkDone - reportedWorkDone); reportedWorkDone = actualWorkDone; monitor.subTask(getWaitMessage(jobsLeft)); } if (Thread.interrupted()) throw new InterruptedException(); if (monitor.isCanceled()) throw new OperationCanceledException(); //notify hook to service pending syncExecs before falling asleep lockManager.aboutToWait(null); Thread.sleep(100); } } finally { lockManager.aboutToRelease(); removeJobChangeListener(listener); reportUnblocked(monitor); monitor.done(); } } boolean join(InternalJobGroup jobGroup, long timeout, IProgressMonitor monitor) throws InterruptedException, OperationCanceledException { Assert.isLegal(jobGroup != null, "jobGroup should not be null"); //$NON-NLS-1$ Assert.isLegal(timeout >= 0, "timeout should not be negative"); //$NON-NLS-1$ long deadline = timeout == 0 ? 0 : System.currentTimeMillis() + timeout; int jobCount; synchronized (lock) { jobCount = jobGroup.getActiveJobsCount(); } SubMonitor subMonitor = SubMonitor.convert(monitor, JobMessages.jobs_blocked0, jobCount); try { while (true) { if (subMonitor.isCanceled()) throw new OperationCanceledException(); long remainingTime = deadline; if (deadline != 0) { remainingTime -= System.currentTimeMillis(); if (remainingTime <= 0) { return false; } } synchronized (lock) { if ((suspended && jobGroup.getRunningJobsCount() == 0)) break; } if (jobGroup.doJoin(remainingTime)) break; int jobsLeft; synchronized (lock) { jobsLeft = jobGroup.getActiveJobsCount(); } if (jobsLeft < jobCount) subMonitor.worked(jobCount - jobsLeft); jobCount = jobsLeft; subMonitor.setWorkRemaining(jobCount); subMonitor.subTask(getWaitMessage(jobCount)); } } finally { if (monitor != null) { monitor.done(); } } return true; }
Returns a non-null progress monitor instance. If the monitor is null, returns the default monitor supplied by the progress provider, or a NullProgressMonitor if no default monitor is available.
/** * Returns a non-null progress monitor instance. If the monitor is null, * returns the default monitor supplied by the progress provider, or a * NullProgressMonitor if no default monitor is available. */
private IProgressMonitor monitorFor(IProgressMonitor monitor) { if (monitor == null || (monitor instanceof NullProgressMonitor)) { if (progressProvider != null) { try { monitor = progressProvider.getDefaultMonitor(); } catch (Exception e) { String msg = NLS.bind(JobMessages.meta_pluginProblems, JobManager.PI_JOBS); RuntimeLog.log(new Status(IStatus.ERROR, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, msg, e)); } } } if (monitor == null) return new NullProgressMonitor(); return monitor; } @Override public ILock newLock() { return lockManager.newLock(); }
Removes and returns the first waiting job in the queue which is ready to run. Returns null if there are no items waiting in the queue. If an item is removed from the queue, it is moved to the running jobs list.
/** * Removes and returns the first waiting job in the queue which is ready to run. * Returns null if there are no items waiting in the queue. If an item is * removed from the queue, it is moved to the running jobs list. */
private Job nextJob() { synchronized (lock) { // do nothing if the job manager is suspended if (suspended) return null; // tickle the sleep queue to see if anyone wakes up long now = System.currentTimeMillis(); InternalJob job = sleeping.peek(); while (job != null && job.getStartTime() < now) { job.setStartTime(now + delayFor(job.getPriority())); job.setWaitQueueStamp(waitQueueCounter.increment()); changeState(job, Job.WAITING); job = sleeping.peek(); } InternalJobGroup jobGroup = null; // process the wait queue until we find a job whose rules are satisfied. job = waiting.peek(); while (job != null) { InternalJob blocker = findBlockingJob(job); jobGroup = job.getJobGroup(); // previous() method returns the next job in the queue. InternalJob nextWaitingJob = job.previous(); if (blocker != null) { // queue this job after the job that's blocking it changeState(job, InternalJob.BLOCKED); // assert job does not already belong to some other data structure Assert.isTrue(job.next() == null); Assert.isTrue(job.previous() == null); blocker.addLast(job); } else if (jobGroup == null || jobGroup.getMaxThreads() == 0 || (jobGroup.getState() != JobGroup.CANCELING && jobGroup.getRunningJobsCount() < jobGroup.getMaxThreads())) { break; } // skip this job as either this job is blocked on another job or // the maximum number of jobs from the same group are already running. job = nextWaitingJob == waiting.dummy ? null : nextWaitingJob; } // the job to run must be in the running list before we exit // the sync block, otherwise two jobs with conflicting rules could start at once if (job != null) { changeState(job, InternalJob.ABOUT_TO_RUN); if (JobManager.DEBUG) JobManager.debug("Starting job: " + job); //$NON-NLS-1$ } return (Job) job; } } @Override public void optionsChanged(DebugOptions options) { DEBUG_TRACE = options.newDebugTrace(PI_JOBS); DEBUG = options.getBooleanOption(OPTION_DEBUG_JOBS, false); DEBUG_BEGIN_END = options.getBooleanOption(OPTION_DEBUG_BEGIN_END, false); DEBUG_YIELDING = options.getBooleanOption(OPTION_DEBUG_YIELDING, false); DEBUG_YIELDING_DETAILED = options.getBooleanOption(OPTION_DEBUG_YIELDING_DETAILED, false); DEBUG_DEADLOCK = options.getBooleanOption(OPTION_DEADLOCK_ERROR, false); DEBUG_LOCKS = options.getBooleanOption(OPTION_LOCKS, false); DEBUG_SHUTDOWN = options.getBooleanOption(OPTION_SHUTDOWN, false); } @Override public void removeJobChangeListener(IJobChangeListener listener) { jobListeners.remove(listener); }
Report to the progress monitor that this thread is blocked, supplying an information message, and if possible the job that is causing the blockage. Important: An invocation of this method MUST be followed eventually be an invocation of reportUnblocked.
Params:
  • monitor – The monitor to report blocking to
  • blockingJob – The job that is blocking this thread, or null
See Also:
/** * Report to the progress monitor that this thread is blocked, supplying * an information message, and if possible the job that is causing the blockage. * Important: An invocation of this method MUST be followed eventually be * an invocation of reportUnblocked. * @param monitor The monitor to report blocking to * @param blockingJob The job that is blocking this thread, or <code>null</code> * @see #reportUnblocked */
final void reportBlocked(IProgressMonitor monitor, InternalJob blockingJob) { if (!(monitor instanceof IProgressMonitorWithBlocking)) return; IStatus reason; if (blockingJob == null || blockingJob instanceof ThreadJob || blockingJob.isSystem()) { reason = new Status(IStatus.INFO, JobManager.PI_JOBS, 1, JobMessages.jobs_blocked0, null); } else { String msg = NLS.bind(JobMessages.jobs_blocked1, blockingJob.getName()); reason = new JobStatus(IStatus.INFO, (Job) blockingJob, msg); } ((IProgressMonitorWithBlocking) monitor).setBlocked(reason); }
Reports that this thread was blocked, but is no longer blocked and is able to proceed.
Params:
  • monitor – The monitor to report unblocking to.
See Also:
/** * Reports that this thread was blocked, but is no longer blocked and is able * to proceed. * @param monitor The monitor to report unblocking to. * @see #reportBlocked */
final void reportUnblocked(IProgressMonitor monitor) { if (monitor instanceof IProgressMonitorWithBlocking) ((IProgressMonitorWithBlocking) monitor).clearBlocked(); } @Override public final void resume() { synchronized (lock) { suspended = false; //poke the job pool pool.jobQueued(); } } @Deprecated @Override public final void resume(ISchedulingRule rule) { implicitJobs.resume(rule); }
Attempts to immediately start a given job. Returns null if the job was successfully started, and the blocking job if it could not be started immediately due to a currently running job with a conflicting rule. Listeners will never be notified of jobs that are run in this way.
/** * Attempts to immediately start a given job. Returns null if the job was * successfully started, and the blocking job if it could not be started immediately * due to a currently running job with a conflicting rule. Listeners will never * be notified of jobs that are run in this way. */
protected InternalJob runNow(ThreadJob job, boolean releaseWaiting) { if (releaseWaiting) { synchronized (implicitJobs) { synchronized (lock) { return doRunNow(job, releaseWaiting); } } } synchronized (lock) { return doRunNow(job, releaseWaiting); } } private InternalJob doRunNow(ThreadJob job, boolean releaseWaiting) { InternalJob blocking = findBlockingJob(job); //cannot start if there is a conflicting job if (blocking == null) { changeState(job, Job.RUNNING); ((InternalJob) job).setProgressMonitor(new NullProgressMonitor()); job.run(null); if (releaseWaiting) { // atomically release waiting implicitJobs.removeWaiting(job); } } return blocking; } protected void schedule(InternalJob job, long delay, boolean reschedule) { if (!active) throw new IllegalStateException("Job manager has been shut down."); //$NON-NLS-1$ Assert.isNotNull(job, "Job is null"); //$NON-NLS-1$ Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$ synchronized (lock) { if (!reschedule) job.setAboutToRunCanceled(false); //if the job is already running, set it to be rescheduled when done if (job.getState() == Job.RUNNING) { job.setStartTime(delay); return; } //can't schedule a job that is waiting or sleeping if (job.internalGetState() != Job.NONE) return; if (JobManager.DEBUG) JobManager.debug("Scheduling job: " + job); //$NON-NLS-1$ //remember that we are about to schedule the job //to prevent multiple schedule attempts from succeeding (bug 68452) changeState(job, InternalJob.ABOUT_TO_SCHEDULE); } //notify listeners outside sync block jobListeners.scheduled((Job) job, delay, reschedule); //schedule the job doSchedule(job, delay); //call the pool outside sync block to avoid deadlock pool.jobQueued(); }
Adds all family members in the list of jobs to the collection
/** * Adds all family members in the list of jobs to the collection */
private void select(List<InternalJob> members, Object family, InternalJob firstJob, int stateMask) { if (firstJob == null) return; InternalJob job = firstJob; do { //note that job state cannot be NONE at this point if ((family == null || job.belongsTo(family)) && ((job.getState() & stateMask) != 0)) members.add(job); job = job.previous(); } while (job != null && job != firstJob); }
Returns a list of all jobs known to the job manager that belong to the given family.
/** * Returns a list of all jobs known to the job manager that belong to the given family. */
private List<InternalJob> select(Object family) { return select(family, Job.WAITING | Job.SLEEPING | Job.RUNNING); }
Returns a list of all jobs known to the job manager that belong to the given family and are in one of the provided states.
/** * Returns a list of all jobs known to the job manager that belong to the given * family and are in one of the provided states. */
private List<InternalJob> select(Object family, int stateMask) { List<InternalJob> members = new ArrayList<>(); synchronized (lock) { if ((stateMask & Job.RUNNING) != 0) { for (InternalJob internalJob : running) { select(members, family, internalJob, stateMask); } } if ((stateMask & Job.WAITING) != 0) { select(members, family, waiting.peek(), stateMask); for (InternalJob internalJob : yielding) { select(members, family, internalJob, stateMask); } } if ((stateMask & Job.SLEEPING) != 0) select(members, family, sleeping.peek(), stateMask); } return members; } @Override public void setLockListener(LockListener listener) { lockManager.setLockListener(listener); }
Changes a job priority.
/** * Changes a job priority. */
protected void setPriority(InternalJob job, int newPriority) { synchronized (lock) { int oldPriority = job.getPriority(); if (oldPriority == newPriority) return; job.internalSetPriority(newPriority); //if the job is waiting to run, re-shuffle the queue if (job.getState() == Job.WAITING) { long oldStart = job.getStartTime(); job.setStartTime(oldStart + (delayFor(newPriority) - delayFor(oldPriority))); waiting.resort(job); } } } @Override public void setProgressProvider(ProgressProvider provider) { progressProvider = provider; } public void setRule(InternalJob job, ISchedulingRule rule) { synchronized (lock) { //cannot change the rule of a job that is already running Assert.isLegal(job.getState() == Job.NONE); validateRule(rule); job.internalSetRule(rule); } }
Puts a job to sleep. Returns true if the job was successfully put to sleep.
/** * Puts a job to sleep. Returns true if the job was successfully put to sleep. */
protected boolean sleep(InternalJob job) { synchronized (lock) { switch (job.getState()) { case Job.RUNNING : //cannot be paused if it is already running (as opposed to ABOUT_TO_RUN) if (job.internalGetState() == Job.RUNNING) return false; //job hasn't started running yet (aboutToRun listener) break; case Job.SLEEPING : //update the job wake time job.setStartTime(InternalJob.T_INFINITE); //change state again to re-shuffle the sleep queue changeState(job, Job.SLEEPING); return true; case Job.NONE : return true; case Job.WAITING : //put the job to sleep break; } job.setStartTime(InternalJob.T_INFINITE); changeState(job, Job.SLEEPING); } jobListeners.sleeping((Job) job); return true; } @Override public void sleep(Object family) { //don't synchronize because sleep calls listeners for (InternalJob internalJob : select(family)) { sleep(internalJob); } }
Returns the estimated time in milliseconds before the next job is scheduled to wake up. The result may be negative. Returns InternalJob.T_INFINITE if there are no sleeping or waiting jobs.
/** * Returns the estimated time in milliseconds before the next job is scheduled * to wake up. The result may be negative. Returns InternalJob.T_INFINITE if * there are no sleeping or waiting jobs. */
protected long sleepHint() { synchronized (lock) { //wait forever if job manager is suspended if (suspended) return InternalJob.T_INFINITE; if (!waiting.isEmpty()) return 0L; //return the anticipated time that the next sleeping job will wake InternalJob next = sleeping.peek(); if (next == null) return InternalJob.T_INFINITE; return next.getStartTime() - System.currentTimeMillis(); } } /** * Implementation of {@link Job#yieldRule(IProgressMonitor)} */ protected Job yieldRule(InternalJob job, IProgressMonitor monitor) { Thread currentThread = Thread.currentThread(); Assert.isLegal(job.getState() == Job.RUNNING, "Cannot yieldRule job that is " + printState(job.internalGetState())); //$NON-NLS-1$ Assert.isLegal(currentThread == job.getThread(), "Cannot yieldRule from outside job's thread"); //$NON-NLS-1$ InternalJob unblocked; // If job is not a ThreadJob, and it has implicitly started rules, likeThreadJob // is the corresponding ThreadJob. Similarly, if likeThreadJob is not null, then // job is not a ThreadJob ThreadJob likeThreadJob; synchronized (implicitJobs) { synchronized (lock) { // The nested implicit job, if any likeThreadJob = implicitJobs.getThreadJob(currentThread); unblocked = job.previous(); // if unblocked is not null, it was a blocked job. It is guaranteed // that it will be the next job run by the worker threads once this // lock is released. if (unblocked == null) { if (likeThreadJob != null) { // look for any explicit jobs we may be blocking unblocked = ((InternalJob) likeThreadJob).previous(); if (unblocked == null) { // look for any implicit (or yielding) jobs we may be blocking. unblocked = findBlockedJob(likeThreadJob, waitingThreadJobs.iterator()); } } else { // look for any implicit (or yielding) jobs we may be blocking. unblocked = findBlockedJob(job, waitingThreadJobs.iterator()); } } // optimization: do nothing if we don't unblock any job if (unblocked == null) return null; // "release" our rule by exiting RUNNING state changeState(job, InternalJob.YIELDING); if (DEBUG_YIELDING) JobManager.debug(job + " will yieldRule to " + unblocked); //$NON-NLS-1$ if (likeThreadJob != null && likeThreadJob != job) { // if there is a corresponding thread job, it needs yield as well changeState(likeThreadJob, InternalJob.YIELDING); if (DEBUG_YIELDING) JobManager.debug(job + " will yieldRule to " + unblocked); //$NON-NLS-1$ } if (likeThreadJob != null) { // only null-out threads out for non-ThreadJobs job.setThread(null); if (likeThreadJob.getRule() != null) { getLockManager().removeLockThread(currentThread, likeThreadJob.getRule()); } } if ((job.getRule() != null) && !(job instanceof ThreadJob)) getLockManager().removeLockThread(currentThread, job.getRule()); } } // To prevent this job from immediately re-grabbing the scheduling rule wait until // the unblocked job changes state. This unblocked job is guaranteed to be the // next job of the set of similar conflicting rules to attempt to run. if (DEBUG_YIELDING_DETAILED) JobManager.debug(job + " is waiting for " + unblocked + " to transition from WAITING state"); //$NON-NLS-1$ //$NON-NLS-2$ waitForUnblocked(unblocked); // restart this job, unless we've been restarted already // This is the same as ThreadJob begin, except that cancelation CAN NOT be supported // throwing the OperationCanceledException will return execution to the caller. IProgressMonitor mon = monitorFor(monitor); ProgressMonitorWrapper nonCanceling = new ProgressMonitorWrapper(mon) { @Override public boolean isCanceled() { // pass-through request getWrappedProgressMonitor().isCanceled(); // ignore result return false; } }; if (DEBUG_YIELDING) JobManager.debug(job + " waiting to resume"); //$NON-NLS-1$ // this yielding job becomes an implicit job, unless it is one already if (likeThreadJob == null) { // Create a Threadjob proxy. This is strictly an internal job, but its not // preventing from "leaking" out to clients in the form of listener // notifications, and via IJobManager API usage like find(). // Set a flag to differentiate it from regular ThreadJobs. ThreadJob threadJob = new ThreadJob(job.getRule()) { @Override boolean isResumingAfterYield() { return true; } }; threadJob.setRealJob((Job) job); ThreadJob.joinRun(threadJob, nonCanceling); // the following state changes are atomic synchronized (lock) { // Must end the temporary threadJob to remove from running list changeState(threadJob, Job.NONE); changeState(job, Job.RUNNING); job.setThread(currentThread); } } else { ThreadJob.joinRun(likeThreadJob, nonCanceling); synchronized (lock) { changeState(job, Job.RUNNING); job.setThread(currentThread); } } if (DEBUG_YIELDING) { // extra assert: make sure no other conflicting jobs are running now synchronized (lock) { for (InternalJob other : running) { if (other == job) continue; Assert.isTrue(!other.isConflicting(job), other + " conflicts and ran simultaneously with " + job); //$NON-NLS-1$ } } JobManager.debug(job + " resumed"); //$NON-NLS-1$ } if (unblocked instanceof ThreadJob && ((ThreadJob) unblocked).isResumingAfterYield()) { // if the unblocked job is a proxy for a yielding job to start, return // the original job. No need to expose the proxy ThreadJob. return ((ThreadJob) unblocked).realJob; } return (Job) unblocked; } private void waitForUnblocked(InternalJob theJob) { // wait until theJob leaves WAITING state boolean interrupted = false; synchronized (theJob.jobStateLock) { if (theJob instanceof ThreadJob) { // We can't acquire the implicitJob lock while holding jobStateLock, // so use isWaiting instead. while (((ThreadJob) theJob).isWaiting) { try { theJob.jobStateLock.wait(); } catch (InterruptedException e) { interrupted = true; } } } else { while (theJob.internalGetState() == Job.WAITING) { try { theJob.jobStateLock.wait(); } catch (InterruptedException e) { interrupted = true; } } } } if (interrupted) Thread.currentThread().interrupt(); }
Invokes Job.shouldRun() while guarding against unexpected failures.
/** * Invokes {@link Job#shouldRun()} while guarding against unexpected failures. */
private boolean shouldRun(Job job) { Throwable t; try { return job.shouldRun(); } catch (Exception | LinkageError | AssertionError e) { t = e; } RuntimeLog.log(new Status(IStatus.ERROR, JobManager.PI_JOBS, JobManager.PLUGIN_ERROR, "Error invoking shouldRun() method on: " + job, t)); //$NON-NLS-1$ //if the should is unexpectedly failing it is safer not to run it return false; }
Returns the next job to be run, or null if no jobs are waiting to run. The worker must call endJob when the job is finished running.
/** * Returns the next job to be run, or null if no jobs are waiting to run. * The worker must call endJob when the job is finished running. */
protected Job startJob(Worker worker) { Job job = null; while (true) { job = nextJob(); if (job == null) return null; //must perform this outside sync block because it is third party code boolean shouldRun = shouldRun(job); //check for listener veto if (shouldRun) jobListeners.aboutToRun(job); //listeners may have canceled or put the job to sleep boolean endJob = false; synchronized (lock) { JobGroup jobGroup = job.getJobGroup(); if (jobGroup != null && jobGroup.getState() == JobGroup.CANCELING) shouldRun = false; InternalJob internal = job; synchronized (internal.jobStateLock) { if (internal.internalGetState() == InternalJob.ABOUT_TO_RUN) { if (shouldRun && !internal.isAboutToRunCanceled()) { internal.setProgressMonitor(createMonitor(job)); //change from ABOUT_TO_RUN to RUNNING internal.setThread(worker); internal.internalSetState(Job.RUNNING); internal.jobStateLock.notifyAll(); break; } endJob = true; //fall through and end the job below } } } if (endJob) { //job has been vetoed or canceled, so mark it as done endJob(job, Status.CANCEL_STATUS, true); continue; } } jobListeners.running(job); return job; } @Override public final void suspend() { synchronized (lock) { suspended = true; } } @Deprecated @Override public final void suspend(ISchedulingRule rule, IProgressMonitor monitor) { Assert.isNotNull(rule); implicitJobs.suspend(rule, monitorFor(monitor)); } @Override public void transferRule(ISchedulingRule rule, Thread destinationThread) { implicitJobs.transfer(rule, destinationThread); }
Validates that the given scheduling rule obeys the constraints of scheduling rules as described in the ISchedulingRule javadoc specification.
/** * Validates that the given scheduling rule obeys the constraints of * scheduling rules as described in the <code>ISchedulingRule</code> * javadoc specification. */
private void validateRule(ISchedulingRule rule) { //null rule always valid if (rule == null) return; if (rule instanceof MultiRule) { ISchedulingRule[] children = ((MultiRule) rule).getChildren(); for (ISchedulingRule element : children) { Assert.isLegal(element != rule); validateRule(element); } } //contains method must be reflexive Assert.isLegal(rule.contains(rule)); //contains method must return false when given an unknown rule Assert.isLegal(!rule.contains(nullRule)); //isConflicting method must be reflexive Assert.isLegal(rule.isConflicting(rule)); //isConflicting method must return false when given an unknown rule Assert.isLegal(!rule.isConflicting(nullRule)); } protected void wakeUp(InternalJob job, long delay) { Assert.isLegal(delay >= 0, "Scheduling delay is negative"); //$NON-NLS-1$ boolean scheduled; synchronized (lock) { //cannot wake up if it is not sleeping if (job.getState() != Job.SLEEPING) return; scheduled = doSchedule(job, delay); } //call the pool outside sync block to avoid deadlock pool.jobQueued(); //only notify of wake up if immediate if (scheduled && delay == 0) jobListeners.awake((Job) job); } @Override public void wakeUp(Object family) { //don't synchronize because wakeUp calls listeners for (InternalJob internalJob : select(family)) { wakeUp(internalJob, 0L); } } void endMonitoring(ThreadJob threadJob) { synchronized (monitorStack) { for (int i = monitorStack.size() - 1; i >= 0; i--) { if (monitorStack.get(i)[0] == threadJob) { monitorStack.remove(i); monitorStack.notifyAll(); break; } } } } void beginMonitoring(ThreadJob threadJob, IProgressMonitor monitor) { synchronized (monitorStack) { monitorStack.add(new Object[] {threadJob, monitor}); monitorStack.notifyAll(); } }
Listens for the job completion events and checks for the job group cancellation, computes and logs the group result.
/** * Listens for the job completion events and checks for the job group cancellation, * computes and logs the group result. */
private class JobGroupUpdater extends JobChangeAdapter { Object jobManagerLock; public JobGroupUpdater(Object jobManagerLock) { this.jobManagerLock = jobManagerLock; } @Override public void done(IJobChangeEvent event) { InternalJob job = event.getJob(); InternalJobGroup jobGroup = job.getJobGroup(); if (jobGroup == null) return; IStatus jobResult = event.getResult(); boolean reschedule = ((JobChangeEvent) event).reschedule; int jobGroupState; int activeJobsCount; int failedJobsCount; int canceledJobsCount; int seedJobsRemainingCount; List<IStatus> jobResults = Collections.emptyList(); synchronized (jobManagerLock) { // Collect the required details to check for the group cancellation and completion // outside the synchronized block. jobGroupState = jobGroup.getState(); activeJobsCount = jobGroup.getActiveJobsCount(); failedJobsCount = jobGroup.getFailedJobsCount(); canceledJobsCount = jobGroup.getCanceledJobsCount(); seedJobsRemainingCount = jobGroup.getSeedJobsRemainingCount(); if (activeJobsCount == 0) jobResults = jobGroup.getCompletedJobResults(); } // Check for the group completion. if (!reschedule && jobGroupState != JobGroup.NONE && activeJobsCount == 0 && (seedJobsRemainingCount <= 0 || jobGroupState == JobGroup.CANCELING)) { // Must perform this outside the sync block to avoid a potential deadlock MultiStatus jobGroupResult = jobGroup.computeGroupResult(jobResults); Assert.isLegal(jobGroupResult != null, "The group result should not be null"); //$NON-NLS-1$ boolean isJobGroupCompleted = false; synchronized (jobManagerLock) { // If more jobs were added to the group while were computing the result, the job group // remains in the ACTIVE state and the computed result is discarded to be recomputed later, // after the new jobs finish. if (jobGroup.getState() != JobGroup.NONE && jobGroup.getActiveJobsCount() == 0) { jobGroup.endJobGroup(jobGroupResult); isJobGroupCompleted = true; } } // If the job group is completing, add the job group's status to the event // and log errors and warnings. if (isJobGroupCompleted) { ((JobChangeEvent) event).jobGroupResult = jobGroupResult; if (jobGroupResult.matches(IStatus.ERROR | IStatus.WARNING)) RuntimeLog.log(jobGroupResult); } return; } if (jobGroupState != JobGroup.CANCELING && jobGroup.shouldCancel(jobResult, failedJobsCount, canceledJobsCount)) cancel(jobGroup, true); } } }