/*
 * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.server;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.ext.WriterInterceptor;

import javax.inject.Provider;

import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.process.MappableException;

Used for sending messages in "typed" chunks. Useful for long running processes, which needs to produce partial responses.
Author:Pavel Bucek, Martin Matula, Marek Potociar
Type parameters:
  • <T> – chunk type.
/** * Used for sending messages in "typed" chunks. Useful for long running processes, * which needs to produce partial responses. * * @param <T> chunk type. * @author Pavel Bucek * @author Martin Matula * @author Marek Potociar */
// TODO: something like prequel/sequel - usable for EventChannelWriter and XML related writers public class ChunkedOutput<T> extends GenericType<T> implements Closeable { private static final byte[] ZERO_LENGTH_DELIMITER = new byte[0]; private final BlockingDeque<T> queue = new LinkedBlockingDeque<>(); private final byte[] chunkDelimiter; private final AtomicBoolean resumed = new AtomicBoolean(false); private boolean flushing = false; private volatile boolean closed = false; private volatile AsyncContext asyncContext; private volatile RequestScope requestScope; private volatile RequestContext requestScopeContext; private volatile ContainerRequest requestContext; private volatile ContainerResponse responseContext; private volatile ConnectionCallback connectionCallback;
Create new ChunkedOutput.
/** * Create new {@code ChunkedOutput}. */
protected ChunkedOutput() { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; }
Create ChunkedOutput with specified type.
Params:
  • chunkType – chunk type. Must not be {code null}.
/** * Create {@code ChunkedOutput} with specified type. * * @param chunkType chunk type. Must not be {code null}. */
public ChunkedOutput(final Type chunkType) { super(chunkType); this.chunkDelimiter = ZERO_LENGTH_DELIMITER; }
Create new ChunkedOutput with a custom chunk delimiter.
Params:
  • chunkDelimiter – custom chunk delimiter bytes. Must not be {code null}.
Since:2.4.1
/** * Create new {@code ChunkedOutput} with a custom chunk delimiter. * * @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}. * @since 2.4.1 */
protected ChunkedOutput(final byte[] chunkDelimiter) { if (chunkDelimiter.length > 0) { this.chunkDelimiter = new byte[chunkDelimiter.length]; System.arraycopy(chunkDelimiter, 0, this.chunkDelimiter, 0, chunkDelimiter.length); } else { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } }
Create new ChunkedOutput with a custom chunk delimiter.
Params:
  • chunkDelimiter – custom chunk delimiter bytes. Must not be {code null}.
Since:2.4.1
/** * Create new {@code ChunkedOutput} with a custom chunk delimiter. * * @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}. * @since 2.4.1 */
protected ChunkedOutput(final byte[] chunkDelimiter, Provider<AsyncContext> asyncContextProvider) { if (chunkDelimiter.length > 0) { this.chunkDelimiter = new byte[chunkDelimiter.length]; System.arraycopy(chunkDelimiter, 0, this.chunkDelimiter, 0, chunkDelimiter.length); } else { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get(); }
Create new ChunkedOutput with a custom chunk delimiter.
Params:
  • chunkType – chunk type. Must not be {code null}.
  • chunkDelimiter – custom chunk delimiter bytes. Must not be {code null}.
Since:2.4.1
/** * Create new {@code ChunkedOutput} with a custom chunk delimiter. * * @param chunkType chunk type. Must not be {code null}. * @param chunkDelimiter custom chunk delimiter bytes. Must not be {code null}. * @since 2.4.1 */
public ChunkedOutput(final Type chunkType, final byte[] chunkDelimiter) { super(chunkType); if (chunkDelimiter.length > 0) { this.chunkDelimiter = new byte[chunkDelimiter.length]; System.arraycopy(chunkDelimiter, 0, this.chunkDelimiter, 0, chunkDelimiter.length); } else { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } }
Create new ChunkedOutput with a custom chunk delimiter.
Params:
  • chunkDelimiter – custom chunk delimiter string. Must not be {code null}.
Since:2.4.1
/** * Create new {@code ChunkedOutput} with a custom chunk delimiter. * * @param chunkDelimiter custom chunk delimiter string. Must not be {code null}. * @since 2.4.1 */
protected ChunkedOutput(final String chunkDelimiter) { if (chunkDelimiter.isEmpty()) { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } else { this.chunkDelimiter = chunkDelimiter.getBytes(); } }
Create new ChunkedOutput with a custom chunk delimiter.
Params:
  • chunkType – chunk type. Must not be {code null}.
  • chunkDelimiter – custom chunk delimiter string. Must not be {code null}.
Since:2.4.1
/** * Create new {@code ChunkedOutput} with a custom chunk delimiter. * * @param chunkType chunk type. Must not be {code null}. * @param chunkDelimiter custom chunk delimiter string. Must not be {code null}. * @since 2.4.1 */
public ChunkedOutput(final Type chunkType, final String chunkDelimiter) { super(chunkType); if (chunkDelimiter.isEmpty()) { this.chunkDelimiter = ZERO_LENGTH_DELIMITER; } else { this.chunkDelimiter = chunkDelimiter.getBytes(); } }
Write a chunk.
Params:
  • chunk – a chunk instance to be written.
Throws:
  • IOException – if this response is closed or when encountered any problem during serializing or writing a chunk.
/** * Write a chunk. * * @param chunk a chunk instance to be written. * @throws IOException if this response is closed or when encountered any problem during serializing or writing a chunk. */
public void write(final T chunk) throws IOException { if (closed) { throw new IOException(LocalizationMessages.CHUNKED_OUTPUT_CLOSED()); } if (chunk != null) { queue.add(chunk); } flushQueue(); } protected void flushQueue() throws IOException { if (resumed.compareAndSet(false, true) && asyncContext != null) { asyncContext.resume(this); } if (requestScopeContext == null || requestContext == null || responseContext == null) { return; } Exception ex = null; try { requestScope.runInScope(requestScopeContext, new Callable<Void>() { @Override public Void call() throws IOException { boolean shouldClose; T t; synchronized (ChunkedOutput.this) { if (flushing) { // if another thread is already flushing the queue, we don't have to do anything return null; } // remember the closed flag before polling the queue // (if we did it after, we could miss the last chunk as some other thread may add a chunk // and set closed to true right after we have polled the queue (i.e. we'd think the queue is empty), // but before we check if we should close - so we would close the stream leaving the last chunk // undelivered) shouldClose = closed; t = queue.poll(); if (t != null || shouldClose) { // no other thread is flushing this queue at the moment and it is not empty and/or we should close -> // set the flushing flag so that other threads know it is already being taken care of // and they don't have to bother flushing = true; } } while (t != null) { try { final OutputStream origStream = responseContext.getEntityStream(); final OutputStream writtenStream = requestContext.getWorkers().writeTo( t, t.getClass(), getType(), responseContext.getEntityAnnotations(), responseContext.getMediaType(), responseContext.getHeaders(), requestContext.getPropertiesDelegate(), origStream, // The output stream stored in the response context for this chunked output // is already intercepted as a whole (if there are any interceptors); // no need to intercept the individual chunks. Collections.<WriterInterceptor>emptyList()); //noinspection ArrayEquality if (chunkDelimiter != ZERO_LENGTH_DELIMITER) { // if the chunked output is configured with a custom delimiter, use it writtenStream.write(chunkDelimiter); } // flush the chunk (some writers do it, but some don't) writtenStream.flush(); if (origStream != writtenStream) { // if MBW replaced the stream, let's make sure to set it in the response context. responseContext.setEntityStream(writtenStream); } } catch (final IOException ioe) { connectionCallback.onDisconnect(asyncContext); throw ioe; } catch (final MappableException mpe) { if (mpe.getCause() instanceof IOException) { connectionCallback.onDisconnect(asyncContext); } throw mpe; } t = queue.poll(); if (t == null) { synchronized (ChunkedOutput.this) { // queue seems empty // check again in the synchronized block before clearing the flushing flag // first remember the closed flag (this has to be before polling the queue, // otherwise we could miss the last chunk) shouldClose = closed; t = queue.poll(); if (t == null) { // ok, it is really empty - if anyone adds a chunk while we are here, // other thread will take care of it -> flush the stream and unset // the flushing flag at the very end (to make sure it is unset only if no // exception is thrown) responseContext.commitStream(); // if closing, we keep the "flushing" flag set, since no other thread needs to flush // this queue anymore - finally clause will take care of closing the stream flushing = shouldClose; break; } } } } return null; } }); } catch (final Exception e) { closed = true; // remember the exception (it will get rethrown from finally clause, once it does it's work) ex = e; } finally { if (closed) { try { responseContext.close(); } catch (final Exception e) { // if no exception remembered before, remember this one // otherwise the previously remembered exception (from catch clause) takes precedence ex = ex == null ? e : ex; } requestScopeContext.release(); // rethrow remembered exception (if any) if (ex instanceof IOException) { //noinspection ThrowFromFinallyBlock throw (IOException) ex; } else if (ex instanceof RuntimeException) { //noinspection ThrowFromFinallyBlock throw (RuntimeException) ex; } } } }
Close this response - it will be finalized and underlying connections will be closed or made available for another response.
/** * Close this response - it will be finalized and underlying connections will be closed * or made available for another response. */
@Override public void close() throws IOException { closed = true; flushQueue(); }
Get state information. Please note that ChunkedOutput can be closed by the client side - client can close connection from its side.
Returns:true when closed, false otherwise.
/** * Get state information. * * Please note that {@code ChunkedOutput} can be closed by the client side - client can close connection * from its side. * * @return true when closed, false otherwise. */
public boolean isClosed() { return closed; } @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @Override public boolean equals(final Object obj) { return this == obj; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + queue.hashCode(); return result; } @Override public String toString() { return "ChunkedOutput<" + getType() + ">"; }
Set context used for writing chunks.
Params:
  • requestScope – request scope.
  • requestScopeContext – current request context instance.
  • requestContext – request context.
  • responseContext – response context.
  • connectionCallbackRunner – connection callback.
Throws:
  • IOException – when encountered any problem during serializing or writing a chunk.
/** * Set context used for writing chunks. * * @param requestScope request scope. * @param requestScopeContext current request context instance. * @param requestContext request context. * @param responseContext response context. * @param connectionCallbackRunner connection callback. * @throws IOException when encountered any problem during serializing or writing a chunk. */
void setContext(final RequestScope requestScope, final RequestContext requestScopeContext, final ContainerRequest requestContext, final ContainerResponse responseContext, final ConnectionCallback connectionCallbackRunner) throws IOException { this.requestScope = requestScope; this.requestScopeContext = requestScopeContext; this.requestContext = requestContext; this.responseContext = responseContext; this.connectionCallback = connectionCallbackRunner; flushQueue(); } }