/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2014 Red Hat, Inc., and individual contributors
 * as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package io.undertow.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.StatusCodes;

Author:Stuart Douglas
/** * @author Stuart Douglas */
public class BlockingReceiverImpl implements Receiver { private static final ErrorCallback END_EXCHANGE = new ErrorCallback() { @Override public void error(HttpServerExchange exchange, IOException e) { if(!exchange.isResponseStarted()) { exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); } exchange.setPersistent(false); UndertowLogger.REQUEST_IO_LOGGER.ioException(e); exchange.endExchange(); } }; public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; private final HttpServerExchange exchange; private final InputStream inputStream; private int maxBufferSize = -1; private boolean done = false; public BlockingReceiverImpl(HttpServerExchange exchange, InputStream inputStream) { this.exchange = exchange; this.inputStream = inputStream; } @Override public void setMaxBufferSize(int maxBufferSize) { this.maxBufferSize = maxBufferSize; } @Override public void receiveFullString(final FullStringCallback callback, ErrorCallback errorCallback) { receiveFullString(callback, errorCallback, StandardCharsets.ISO_8859_1); } @Override public void receiveFullString(FullStringCallback callback) { receiveFullString(callback, END_EXCHANGE, StandardCharsets.ISO_8859_1); } @Override public void receivePartialString(PartialStringCallback callback, ErrorCallback errorCallback) { receivePartialString(callback, errorCallback, StandardCharsets.ISO_8859_1); } @Override public void receivePartialString(PartialStringCallback callback) { receivePartialString(callback, END_EXCHANGE, StandardCharsets.ISO_8859_1); } @Override public void receiveFullString(final FullStringCallback callback, final ErrorCallback errorCallback, final Charset charset) { if(done) { throw UndertowMessages.MESSAGES.requestBodyAlreadyRead(); } final ErrorCallback error = errorCallback == null ? END_EXCHANGE : errorCallback; if (callback == null) { throw UndertowMessages.MESSAGES.argumentCannotBeNull("callback"); } if (exchange.isRequestComplete()) { callback.handle(exchange, ""); return; } String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH); long contentLength; final ByteArrayOutputStream sb; if (contentLengthString != null) { contentLength = Long.parseLong(contentLengthString); if (contentLength > Integer.MAX_VALUE) { error.error(exchange, new RequestToLargeException()); return; } sb = new ByteArrayOutputStream((int) contentLength); } else { contentLength = -1; sb = new ByteArrayOutputStream(); } if (maxBufferSize > 0) { if (contentLength > maxBufferSize) { error.error(exchange, new RequestToLargeException()); return; } } int s; try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) { while ((s = inputStream.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) { sb.write(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), s); } callback.handle(exchange, sb.toString(charset.name())); } catch (IOException e) { error.error(exchange, e); } } @Override public void receiveFullString(FullStringCallback callback, Charset charset) { receiveFullString(callback, END_EXCHANGE, charset); } @Override public void receivePartialString(final PartialStringCallback callback, final ErrorCallback errorCallback, Charset charset) { if(done) { throw UndertowMessages.MESSAGES.requestBodyAlreadyRead(); } final ErrorCallback error = errorCallback == null ? END_EXCHANGE : errorCallback; if (callback == null) { throw UndertowMessages.MESSAGES.argumentCannotBeNull("callback"); } if (exchange.isRequestComplete()) { callback.handle(exchange, "", true); return; } String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH); long contentLength; if (contentLengthString != null) { contentLength = Long.parseLong(contentLengthString); if (contentLength > Integer.MAX_VALUE) { error.error(exchange, new RequestToLargeException()); return; } } else { contentLength = -1; } if (maxBufferSize > 0) { if (contentLength > maxBufferSize) { error.error(exchange, new RequestToLargeException()); return; } } CharsetDecoder decoder = charset.newDecoder(); int s; try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) { while ((s = inputStream.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) { pooled.getBuffer().limit(s); CharBuffer res = decoder.decode(pooled.getBuffer()); callback.handle(exchange, res.toString(), false); pooled.getBuffer().clear(); } callback.handle(exchange, "", true); } catch (IOException e) { error.error(exchange, e); } } @Override public void receivePartialString(PartialStringCallback callback, Charset charset) { receivePartialString(callback, END_EXCHANGE, charset); } @Override public void receiveFullBytes(final FullBytesCallback callback, final ErrorCallback errorCallback) { if(done) { throw UndertowMessages.MESSAGES.requestBodyAlreadyRead(); } final ErrorCallback error = errorCallback == null ? END_EXCHANGE : errorCallback; if (callback == null) { throw UndertowMessages.MESSAGES.argumentCannotBeNull("callback"); } if (exchange.isRequestComplete()) { callback.handle(exchange, EMPTY_BYTE_ARRAY); return; } String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH); long contentLength; final ByteArrayOutputStream sb; if (contentLengthString != null) { contentLength = Long.parseLong(contentLengthString); if (contentLength > Integer.MAX_VALUE) { error.error(exchange, new RequestToLargeException()); return; } sb = new ByteArrayOutputStream((int) contentLength); } else { contentLength = -1; sb = new ByteArrayOutputStream(); } if (maxBufferSize > 0) { if (contentLength > maxBufferSize) { error.error(exchange, new RequestToLargeException()); return; } } int s; try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) { while ((s = inputStream.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) { sb.write(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), s); } callback.handle(exchange, sb.toByteArray()); } catch (IOException e) { error.error(exchange, e); } } @Override public void receiveFullBytes(FullBytesCallback callback) { receiveFullBytes(callback, END_EXCHANGE); } @Override public void receivePartialBytes(final PartialBytesCallback callback, final ErrorCallback errorCallback) { if(done) { throw UndertowMessages.MESSAGES.requestBodyAlreadyRead(); } final ErrorCallback error = errorCallback == null ? END_EXCHANGE : errorCallback; if (callback == null) { throw UndertowMessages.MESSAGES.argumentCannotBeNull("callback"); } if (exchange.isRequestComplete()) { callback.handle(exchange, EMPTY_BYTE_ARRAY, true); return; } String contentLengthString = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH); long contentLength; if (contentLengthString != null) { contentLength = Long.parseLong(contentLengthString); if (contentLength > Integer.MAX_VALUE) { error.error(exchange, new RequestToLargeException()); return; } } else { contentLength = -1; } if (maxBufferSize > 0) { if (contentLength > maxBufferSize) { error.error(exchange, new RequestToLargeException()); return; } } int s; try (PooledByteBuffer pooled = exchange.getConnection().getByteBufferPool().getArrayBackedPool().allocate()) { while ((s = inputStream.read(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), pooled.getBuffer().remaining())) > 0) { byte[] newData = new byte[s]; System.arraycopy(pooled.getBuffer().array(), pooled.getBuffer().arrayOffset(), newData, 0, s); callback.handle(exchange, newData, false); } callback.handle(exchange, EMPTY_BYTE_ARRAY, true); } catch (IOException e) { error.error(exchange, e); } } @Override public void receivePartialBytes(PartialBytesCallback callback) { receivePartialBytes(callback, END_EXCHANGE); } @Override public void pause() { //noop for blocking receiver } @Override public void resume() { //noop for blocking receiver } }