aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java368
1 files changed, 368 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
new file mode 100644
index 000000000..26fade6d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+ private final byte[] lineSeparator;
+ private final byte normalizedLineSeparator;
+ private final TextParsingSettingsV3 settings;
+
+ private long lineCount;
+ private long charCount;
+
+ /**
+ * The starting position in the file.
+ */
+ private final long startPos;
+ private final long endPos;
+
+ private long streamPos;
+
+ private final Seekable seekable;
+ private final FSDataInputStream inputFS;
+ private final InputStream input;
+
+ private final DrillBuf buffer;
+ private final ByteBuffer underlyingBuffer;
+ private final long bStart;
+ private final long bStartMinus1;
+
+ private final boolean bufferReadable;
+
+ /**
+ * Whether there was a possible partial line separator on the previous
+ * read so we dropped it and it should be appended to next read.
+ */
+ private int remByte = -1;
+
+ /**
+ * The current position in the buffer.
+ */
+ public int bufferPtr;
+
+ /**
+ * The quantity of valid data in the buffer.
+ */
+ public int length = -1;
+
+ private boolean endFound = false;
+
+ /**
+ * Creates a new instance with the mandatory characters for handling newlines
+ * transparently. lineSeparator the sequence of characters that represent a
+ * newline, as defined in {@link Format#getLineSeparator()}
+ * normalizedLineSeparator the normalized newline character (as defined in
+ * {@link Format#getNormalizedNewline()}) that is used to replace any
+ * lineSeparator sequence found in the input.
+ */
+ public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+ this.lineSeparator = settings.getNewLineDelimiter();
+ byte normalizedLineSeparator = settings.getNormalizedNewLine();
+ Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+ boolean isCompressed = input instanceof CompressionInputStream;
+ Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+ // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
+ if (isCompressed && endPos > 0) {
+ endPos = Long.MAX_VALUE;
+ }
+
+ this.input = input;
+ this.seekable = (Seekable) input;
+ this.settings = settings;
+
+ if (input instanceof FSDataInputStream) {
+ this.inputFS = (FSDataInputStream) input;
+ this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+ } else {
+ this.inputFS = null;
+ this.bufferReadable = false;
+ }
+
+ this.startPos = startPos;
+ this.endPos = endPos;
+
+ this.normalizedLineSeparator = normalizedLineSeparator;
+
+ this.buffer = readBuffer;
+ this.bStart = buffer.memoryAddress();
+ this.bStartMinus1 = bStart -1;
+ this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+ }
+
+ /**
+ * Test the input to position for read start. If the input is a non-zero split or
+ * splitFirstLine is enabled, input will move to appropriate complete line.
+ * @throws IOException for input file read errors
+ */
+ final void start() throws IOException {
+ lineCount = 0;
+ if(startPos > 0){
+ seekable.seek(startPos);
+ }
+
+ updateBuffer();
+ if (length > 0) {
+ if (startPos > 0 || settings.isSkipFirstLine()) {
+
+ // move to next full record.
+ skipLines(1);
+ }
+ }
+ }
+
+
+ /**
+ * Helper method to get the most recent characters consumed since the last record started.
+ * May get an incomplete string since we don't support stream rewind. Returns empty string for now.
+ *
+ * @return String of last few bytes.
+ * @throws IOException for input file read errors
+ */
+ public String getStringSinceMarkForError() throws IOException {
+ return " ";
+ }
+
+ long getPos() {
+ return streamPos + bufferPtr;
+ }
+
+ public void mark() { }
+
+ /**
+ * Read some more bytes from the stream. Uses the zero copy interface if available.
+ * Otherwise, does byte copy.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void read() throws IOException {
+ if (bufferReadable) {
+
+ if (remByte != -1) {
+ for (int i = 0; i <= remByte; i++) {
+ underlyingBuffer.put(lineSeparator[i]);
+ }
+ remByte = -1;
+ }
+ length = inputFS.read(underlyingBuffer);
+
+ } else {
+ byte[] b = new byte[underlyingBuffer.capacity()];
+ if (remByte != -1){
+ int remBytesNum = remByte + 1;
+ System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+ length = input.read(b, remBytesNum, b.length - remBytesNum);
+ remByte = -1;
+ } else {
+ length = input.read(b);
+ }
+ underlyingBuffer.put(b);
+ }
+ }
+
+
+ /**
+ * Read more data into the buffer. Will also manage split end conditions.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void updateBuffer() throws IOException {
+ streamPos = seekable.getPos();
+ underlyingBuffer.clear();
+
+ if (endFound) {
+ length = -1;
+ return;
+ }
+
+ read();
+
+ // check our data read allowance.
+ if (streamPos + length >= this.endPos) {
+ updateLengthBasedOnConstraint();
+ }
+
+ charCount += bufferPtr;
+ bufferPtr = 1;
+
+ buffer.writerIndex(underlyingBuffer.limit());
+ buffer.readerIndex(underlyingBuffer.position());
+ }
+
+ /**
+ * Checks to see if we can go over the end of our bytes constraint on the data. If so,
+ * adjusts so that we can only read to the last character of the first line that crosses
+ * the split boundary.
+ */
+ private void updateLengthBasedOnConstraint() {
+ final long max = bStart + length;
+ for (long m = bStart + (endPos - streamPos); m < max; m++) {
+ for (int i = 0; i < lineSeparator.length; i++) {
+ long mPlus = m + i;
+ if (mPlus < max) {
+ // we found a line separator and don't need to consult the next byte.
+ if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
+ length = (int) (mPlus - bStart) + 1;
+ endFound = true;
+ return;
+ }
+ } else {
+ // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
+ remByte = i;
+ length = length - i;
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get next byte from stream. Also maintains the current line count. Will throw a
+ * {@link StreamFinishedPseudoException} when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextChar() throws IOException {
+ byte byteChar = nextCharNoNewLineCheck();
+ int bufferPtrTemp = bufferPtr - 1;
+ if (byteChar == lineSeparator[0]) {
+ for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+ if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
+ return byteChar;
+ }
+ }
+
+ lineCount++;
+ byteChar = normalizedLineSeparator;
+
+ // we don't need to update buffer position if line separator is one byte long
+ if (lineSeparator.length > 1) {
+ bufferPtr += (lineSeparator.length - 1);
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+ }
+ }
+
+ return byteChar;
+ }
+
+ /**
+ * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException
+ * when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextCharNoNewLineCheck() throws IOException {
+
+ if (length == -1) {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+
+ rangeCheck(buffer, bufferPtr - 1, bufferPtr);
+
+ byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ bufferPtr--;
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+
+ bufferPtr++;
+ return byteChar;
+ }
+
+ /**
+ * Number of lines read since the start of this split.
+ * @return current line count
+ */
+ public final long lineCount() {
+ return lineCount;
+ }
+
+ /**
+ * Skip forward the number of line delimiters. If you are in the middle of a line,
+ * a value of 1 will skip to the start of the next record.
+ *
+ * @param lines Number of lines to skip.
+ * @throws IOException for input file read errors
+ * @throws IllegalArgumentException if unable to skip the requested number
+ * of lines
+ */
+ public final void skipLines(int lines) throws IOException {
+ if (lines < 1) {
+ return;
+ }
+ long expectedLineCount = this.lineCount + lines;
+
+ try {
+ do {
+ nextChar();
+ } while (lineCount < expectedLineCount);
+ if (lineCount < lines) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ } catch (EOFException ex) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ }
+
+ public final long charCount() {
+ return charCount + bufferPtr;
+ }
+
+ public long getLineCount() {
+ return lineCount;
+ }
+
+ public void close() throws IOException{
+ input.close();
+ }
+}