package io.vertx.mysqlclient.impl.codec;
import io.netty.buffer.ByteBuf;
import io.vertx.sqlclient.impl.command.ExtendedQueryCommand;
import static io.vertx.mysqlclient.impl.codec.Packets.*;
class ExtendedQueryCommandCodec<R> extends ExtendedQueryCommandBaseCodec<R, ExtendedQueryCommand<R>> {
ExtendedQueryCommandCodec(ExtendedQueryCommand<R> cmd) {
super(cmd);
if (cmd.fetch() > 0 && statement.isCursorOpen) {
columnDefinitions = statement.rowDesc.columnDefinitions();
}
}
@Override
void encode(MySQLEncoder encoder) {
super.encode(encoder);
if (statement.isCursorOpen) {
decoder = new RowResultDecoder<>(cmd.collector(), false, statement.rowDesc);
sendStatementFetchCommand(statement.statementId, cmd.fetch());
} else {
if (cmd.fetch() > 0) {
sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), sendType, cmd.params(), (byte) 0x01);
} else {
sendStatementExecuteCommand(statement.statementId, statement.paramDesc.paramDefinitions(), sendType, cmd.params(), (byte) 0x00);
}
}
}
@Override
void decodePayload(ByteBuf payload, int payloadLength, int sequenceId) {
if (statement.isCursorOpen) {
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else {
handleRows(payload, payloadLength, super::handleSingleRow);
}
} else {
if (cmd.fetch() > 0) {
switch (commandHandlerState) {
case INIT:
int first = payload.getUnsignedByte(payload.readerIndex());
if (first == ERROR_PACKET_HEADER) {
handleErrorPacketPayload(payload);
} else {
handleResultsetColumnCountPacketBody(payload);
}
break;
case HANDLING_COLUMN_DEFINITION:
handleResultsetColumnDefinitions(payload);
break;
case COLUMN_DEFINITIONS_DECODING_COMPLETED:
skipEofPacketIfNeeded(payload);
case HANDLING_ROW_DATA_OR_END_PACKET:
handleResultsetColumnDefinitionsDecodingCompleted();
this.sequenceId = 0;
decoder = new RowResultDecoder<>(cmd.collector(), false, statement.rowDesc);
statement.isCursorOpen = true;
sendStatementFetchCommand(statement.statementId, cmd.fetch());
break;
default:
throw new IllegalStateException("Unexpected state for decoding COM_STMT_EXECUTE response with cursor opening");
}
} else {
super.decodePayload(payload, payloadLength, sequenceId);
}
}
}
private void sendStatementFetchCommand(long statementId, int count) {
ByteBuf packet = allocateBuffer();
int packetStartIdx = packet.writerIndex();
packet.writeMediumLE(0);
packet.writeByte(sequenceId);
packet.writeByte(CommandType.COM_STMT_FETCH);
packet.writeIntLE((int) statementId);
packet.writeIntLE(count);
int lenOfPayload = packet.writerIndex() - packetStartIdx - 4;
packet.setMediumLE(packetStartIdx, lenOfPayload);
encoder.chctx.writeAndFlush(packet);
}
}