Copyright (c) 2000, 2019 IBM Corporation and others. This program and the accompanying materials are made available under the terms of the Eclipse Public License 2.0 which accompanies this distribution, and is available at https://www.eclipse.org/legal/epl-2.0/ SPDX-License-Identifier: EPL-2.0 Contributors: IBM Corporation - initial API and implementation Paul Pazderski - Bug 545769: fixed rare UTF-8 character corruption bug
/******************************************************************************* * Copyright (c) 2000, 2019 IBM Corporation and others. * * This program and the accompanying materials * are made available under the terms of the Eclipse Public License 2.0 * which accompanies this distribution, and is available at * https://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 * * Contributors: * IBM Corporation - initial API and implementation * Paul Pazderski - Bug 545769: fixed rare UTF-8 character corruption bug *******************************************************************************/
package org.eclipse.debug.internal.core; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.core.runtime.ISafeRunnable; import org.eclipse.core.runtime.ListenerList; import org.eclipse.core.runtime.SafeRunner; import org.eclipse.debug.core.DebugPlugin; import org.eclipse.debug.core.IStreamListener; import org.eclipse.debug.core.model.IFlushableStreamMonitor;
Monitors the output stream of a system process and notifies listeners of additions to the stream. The output stream monitor reads system out (or err) via and input stream.
/** * Monitors the output stream of a system process and notifies * listeners of additions to the stream. * * The output stream monitor reads system out (or err) via * and input stream. */
public class OutputStreamMonitor implements IFlushableStreamMonitor {
The stream being monitored (connected system out or err).
/** * The stream being monitored (connected system out or err). */
private InputStream fStream;
A collection of listeners
/** * A collection of listeners */
private ListenerList<IStreamListener> fListeners = new ListenerList<>();
Whether content is being buffered
/** * Whether content is being buffered */
private boolean fBuffered = true;
The local copy of the stream contents
/** * The local copy of the stream contents */
private StringBuilder fContents;
The thread which reads from the stream
/** * The thread which reads from the stream */
private Thread fThread;
The size of the read buffer
/** * The size of the read buffer */
private static final int BUFFER_SIZE= 8192;
Whether or not this monitor has been killed. When the monitor is killed, it stops reading from the stream immediately.
/** * Whether or not this monitor has been killed. * When the monitor is killed, it stops reading * from the stream immediately. */
private boolean fKilled= false; private long lastSleep; private String fEncoding; private final AtomicBoolean fDone;
Creates an output stream monitor on the given stream (connected to system out or err).
Params:
  • stream – input stream to read from
  • encoding – stream encoding or null for system default
/** * Creates an output stream monitor on the * given stream (connected to system out or err). * * @param stream input stream to read from * @param encoding stream encoding or <code>null</code> for system default */
public OutputStreamMonitor(InputStream stream, String encoding) { fStream = new BufferedInputStream(stream, 8192); fEncoding = encoding; fContents= new StringBuilder(); fDone = new AtomicBoolean(false); } @Override public synchronized void addListener(IStreamListener listener) { fListeners.add(listener); }
Causes the monitor to close all communications between it and the underlying stream by waiting for the thread to terminate.
/** * Causes the monitor to close all * communications between it and the * underlying stream by waiting for the thread to terminate. */
protected void close() { if (fThread != null) { Thread thread= fThread; fThread= null; try { thread.join(); } catch (InterruptedException ie) { } fListeners = new ListenerList<>(); } }
Notifies the listeners that text has been appended to the stream.
Params:
  • text – the text that was appended to the stream
/** * Notifies the listeners that text has * been appended to the stream. * @param text the text that was appended to the stream */
private void fireStreamAppended(String text) { getNotifier().notifyAppend(text); } @Override public synchronized String getContents() { return fContents.toString(); } private void read() { try { internalRead(); } finally { fDone.set(true); } }
Continually reads from the stream.

This method, along with the startReading method is used to allow OutputStreamMonitor to implement Runnable without publicly exposing a run method.

/** * Continually reads from the stream. * <p> * This method, along with the <code>startReading</code> * method is used to allow <code>OutputStreamMonitor</code> * to implement <code>Runnable</code> without publicly * exposing a <code>run</code> method. */
private void internalRead() { lastSleep = System.currentTimeMillis(); long currentTime = lastSleep; char[] chars = new char[BUFFER_SIZE]; int read = 0; try (InputStreamReader reader = (fEncoding == null ? new InputStreamReader(fStream) : new InputStreamReader(fStream, fEncoding))) { while (read >= 0) { try { if (fKilled) { break; } read = reader.read(chars); if (read > 0) { String text = new String(chars, 0, read); synchronized (this) { if (isBuffered()) { fContents.append(text); } fireStreamAppended(text); } } } catch (IOException ioe) { if (!fKilled) { DebugPlugin.log(ioe); } return; } catch (NullPointerException e) { // killing the stream monitor while reading can cause an NPE // when reading from the stream if (!fKilled && fThread != null) { DebugPlugin.log(e); } return; } currentTime = System.currentTimeMillis(); if (currentTime - lastSleep > 1000) { lastSleep = currentTime; try { // just give up CPU to maintain UI responsiveness. Thread.sleep(1); } catch (InterruptedException e) { } } } } catch (IOException e) { DebugPlugin.log(e); } } protected void kill() { fKilled= true; } @Override public synchronized void removeListener(IStreamListener listener) { fListeners.remove(listener); }
Starts a thread which reads from the stream
/** * Starts a thread which reads from the stream */
protected void startMonitoring() { if (fThread == null) { fDone.set(false); fThread = new Thread((Runnable) this::read, DebugCoreMessages.OutputStreamMonitor_label); fThread.setDaemon(true); fThread.setPriority(Thread.MIN_PRIORITY); fThread.start(); } } @Override public synchronized void setBuffered(boolean buffer) { fBuffered = buffer; } @Override public synchronized void flushContents() { fContents.setLength(0); } @Override public synchronized boolean isBuffered() { return fBuffered; } private ContentNotifier getNotifier() { return new ContentNotifier(); }
Returns:true if reading the underlying stream is done. false if reading the stream has not started or is not done.
/** * @return {@code true} if reading the underlying stream is done. * {@code false} if reading the stream has not started or is not done. */
public boolean isReadingDone() { return fDone.get(); } class ContentNotifier implements ISafeRunnable { private IStreamListener fListener; private String fText; @Override public void handleException(Throwable exception) { DebugPlugin.log(exception); } @Override public void run() throws Exception { fListener.streamAppended(fText, OutputStreamMonitor.this); } public void notifyAppend(String text) { if (text == null) { return; } fText = text; for (IStreamListener iStreamListener : fListeners) { fListener = iStreamListener; SafeRunner.run(this); } fListener = null; fText = null; } } }