aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java269
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java267
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java137
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java368
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java126
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java305
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java508
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java22
13 files changed, 2373 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
new file mode 100644
index 000000000..0ed3155a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Original version of the "compliant" text reader. This is version 2 of
+ * the text reader. This version is retained for temporary backward
+ * compatibility as we productize the newer version 3 based on the
+ * row set framework.
+ * <p>
+ * TODO: Remove the files in this package and move the files from the
+ * "v3" sub-package here once the version 3 implementation stabilizes.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
new file mode 100644
index 000000000..6bf0bb69c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+
+public abstract class BaseFieldOutput extends TextOutput {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+ private static final int MAX_FIELD_LENGTH = 1024 * 64;
+
+ // track which field is getting appended
+ protected int currentFieldIndex = -1;
+ // track chars within field
+ protected int currentDataPointer;
+ // track if field is still getting appended
+ private boolean fieldOpen = true;
+ // holds chars for a field
+ protected byte[] fieldBytes;
+ protected final RowSetLoader writer;
+ private final boolean[] projectionMask;
+ protected final int maxField;
+ protected boolean fieldProjected;
+
+ /**
+ * Initialize the field output for one of three scenarios:
+ * <ul>
+ * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1
+ * max fields.</li>
+ * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field
+ * of -1.</li>
+ * <li>SELECT a, b, c indicated by a non-null projection mask that
+ * identifies the indexes of the fields to be selected. In this case,
+ * this constructor computes the maximum field.</li>
+ * </ul>
+ *
+ * @param writer Row set writer that provides access to the writer for
+ * each column
+ * @param maxField the index of the last field to store. May be -1 if no
+ * fields are to be stored. Computed if the projection mask is set
+ * @param projectionMask a boolean array indicating which fields are
+ * to be projected to the output. Optional
+ */
+
+ public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) {
+ this.writer = writer;
+ this.projectionMask = projectionMask;
+
+ // If no projection mask is defined, then we want all columns
+ // up to the max field, which may be -1 if we want to select
+ // nothing.
+
+ if (projectionMask == null) {
+ this.maxField = maxField;
+ } else {
+
+ // Otherwise, use the projection mask to determine
+ // which fields are to be projected. (The file may well
+ // contain more than the projected set.)
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ this.maxField = end;
+ }
+
+ // If we project at least one field, allocate a buffer.
+
+ if (maxField >= 0) {
+ fieldBytes = new byte[MAX_FIELD_LENGTH];
+ }
+ }
+
+ /**
+ * Start a new record record. Resets all pointers
+ */
+
+ @Override
+ public void startRecord() {
+ currentFieldIndex = -1;
+ fieldOpen = false;
+ writer.start();
+ }
+
+ @Override
+ public void startField(int index) {
+ assert index == currentFieldIndex + 1;
+ currentFieldIndex = index;
+ currentDataPointer = 0;
+ fieldOpen = true;
+
+ // Figure out if this field is projected.
+
+ if (projectionMask == null) {
+ fieldProjected = currentFieldIndex <= maxField;
+ } else if (currentFieldIndex >= projectionMask.length) {
+ fieldProjected = false;
+ } else {
+ fieldProjected = projectionMask[currentFieldIndex];
+ }
+ }
+
+ @Override
+ public void append(byte data) {
+ if (! fieldProjected) {
+ return;
+ }
+ if (currentDataPointer >= MAX_FIELD_LENGTH - 1) {
+ throw UserException
+ .unsupportedError()
+ .message("Text column is too large.")
+ .addContext("Column", currentFieldIndex)
+ .addContext("Limit", MAX_FIELD_LENGTH)
+ .build(logger);
+ }
+
+ fieldBytes[currentDataPointer++] = data;
+ }
+
+ @Override
+ public boolean endField() {
+ fieldOpen = false;
+ return currentFieldIndex < maxField;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+ return endField();
+ }
+
+ @Override
+ public void finishRecord() {
+ if (fieldOpen) {
+ endField();
+ }
+ writer.save();
+ }
+
+ @Override
+ public long getRecordCount() {
+ return writer.rowCount();
+ }
+
+ @Override
+ public boolean isFull() {
+ return writer.isFull();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
new file mode 100644
index 000000000..e489003c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * New text reader, complies with the RFC 4180 standard for text/csv files
+ */
+public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class);
+
+ private static final int MAX_RECORDS_PER_BATCH = 8096;
+ private static final int READ_BUFFER = 1024 * 1024;
+ private static final int WHITE_SPACE_BUFFER = 64 * 1024;
+
+ // settings to be used while parsing
+ private final TextParsingSettingsV3 settings;
+ // Chunk of the file to be read by this reader
+ private final FileSplit split;
+ // text reader implementation
+ private TextReader reader;
+ // input buffer
+ private DrillBuf readBuffer;
+ // working buffer to handle whitespaces
+ private DrillBuf whitespaceBuffer;
+ private final DrillFileSystem dfs;
+
+ private RowSetLoader writer;
+
+ public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) {
+ this.split = split;
+ this.settings = settings;
+ this.dfs = dfs;
+
+ // Validate. Otherwise, these problems show up later as a data
+ // read error which is very confusing.
+
+ if (settings.getNewLineDelimiter().length == 0) {
+ throw UserException
+ .validationError()
+ .message("The text format line delimiter cannot be blank.")
+ .build(logger);
+ }
+ }
+
+ /**
+ * Performs the initial setup required for the record reader.
+ * Initializes the input stream, handling of the output record batch
+ * and the actual reader to be used.
+ * @param context operator context from which buffer's will be allocated and managed
+ * @param outputMutator Used to create the schema in the output record batch
+ */
+
+ @Override
+ public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
+ final OperatorContext context = schemaNegotiator.context();
+
+ // Note: DO NOT use managed buffers here. They remain in existence
+ // until the fragment is shut down. The buffers here are large.
+ // If we scan 1000 files, and allocate 1 MB for each, we end up
+ // holding onto 1 GB of memory in managed buffers.
+ // Instead, we allocate the buffers explicitly, and must free
+ // them.
+
+ readBuffer = context.getAllocator().buffer(READ_BUFFER);
+ whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
+
+ // TODO: Set this based on size of record rather than
+ // absolute count.
+
+ schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH);
+
+ // setup Output, Input, and Reader
+ try {
+ TextOutput output;
+
+ if (settings.isHeaderExtractionEnabled()) {
+ output = openWithHeaders(schemaNegotiator);
+ } else {
+ output = openWithoutHeaders(schemaNegotiator);
+ }
+ if (output == null) {
+ return false;
+ }
+ openReader(output);
+ return true;
+ } catch (final IOException e) {
+ throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
+ }
+ }
+
+ /**
+ * Extract header and use that to define the reader schema.
+ */
+
+ private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
+ final String [] fieldNames = extractHeader();
+ if (fieldNames == null) {
+ return null;
+ }
+ final TupleMetadata schema = new TupleSchema();
+ for (final String colName : fieldNames) {
+ schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED));
+ }
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new FieldVarCharOutput(writer);
+ }
+
+ /**
+ * When no headers, create a single array column "columns".
+ */
+
+ private TextOutput openWithoutHeaders(
+ ColumnsSchemaNegotiator schemaNegotiator) {
+ final TupleMetadata schema = new TupleSchema();
+ schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED));
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes());
+ }
+
+ private void openReader(TextOutput output) throws IOException {
+ logger.trace("Opening file {}", split.getPath());
+ final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput input = new TextInput(settings, stream, readBuffer,
+ split.getStart(), split.getStart() + split.getLength());
+
+ // setup Reader using Input and Output
+ reader = new TextReader(settings, input, output, whitespaceBuffer);
+ reader.start();
+ }
+
+ /**
+ * Extracts header from text file.
+ * Currently it is assumed to be first line if headerExtractionEnabled is set to true
+ * TODO: enhance to support more common header patterns
+ * @return field name strings
+ */
+
+ private String [] extractHeader() throws IOException {
+ assert settings.isHeaderExtractionEnabled();
+
+ // don't skip header in case skipFirstLine is set true
+ settings.setSkipFirstLine(false);
+
+ final HeaderBuilder hOutput = new HeaderBuilder(split.getPath());
+
+ // setup Input using InputStream
+ // we should read file header irrespective of split given given to this reader
+ final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength());
+
+ // setup Reader using Input and Output
+ this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer);
+ reader.start();
+
+ // extract first row only
+ reader.parseNext();
+
+ // grab the field names from output
+ final String [] fieldNames = hOutput.getHeaders();
+
+ // cleanup and set to skip the first line next time we read input
+ reader.close();
+ settings.setSkipFirstLine(true);
+
+ readBuffer.clear();
+ whitespaceBuffer.clear();
+ return fieldNames;
+ }
+
+ /**
+ * Generates the next record batch
+ * @return number of records in the batch
+ */
+
+ @Override
+ public boolean next() {
+ reader.resetForNextBatch();
+
+ try {
+ boolean more = false;
+ while (! writer.isFull()) {
+ more = reader.parseNext();
+ if (! more) {
+ break;
+ }
+ }
+ reader.finishBatch();
+
+ // Return false on the batch that hits EOF. The scan operator
+ // knows to process any rows in this final batch.
+
+ return more && writer.rowCount() > 0;
+ } catch (IOException | TextParsingException e) {
+ if (e.getCause() != null && e.getCause() instanceof UserException) {
+ throw (UserException) e.getCause();
+ }
+ throw UserException.dataReadError(e)
+ .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
+ split.getPath(), reader.getPos())
+ .build(logger);
+ }
+ }
+
+ /**
+ * Cleanup state once we are finished processing all the records.
+ * This would internally close the input stream we are reading from.
+ */
+ @Override
+ public void close() {
+
+ // Release the buffers allocated above. Double-check to handle
+ // unexpected multiple calls to close().
+
+ if (readBuffer != null) {
+ readBuffer.release();
+ readBuffer = null;
+ }
+ if (whitespaceBuffer != null) {
+ whitespaceBuffer.release();
+ whitespaceBuffer = null;
+ }
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (final IOException e) {
+ logger.warn("Exception while closing stream.", e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
new file mode 100644
index 000000000..df48a5548
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a set of varchar vectors. A varchar vector contains all the field
+ * values for a given column. Each record is a single value within each vector of the set.
+ */
+class FieldVarCharOutput extends BaseFieldOutput {
+
+ /**
+ * We initialize and add the varchar vector for each incoming field in this
+ * constructor.
+ * @param outputMutator Used to create/modify schema
+ * @param fieldNames Incoming field names
+ * @param columns List of columns selected in the query
+ * @param isStarQuery boolean to indicate if all fields are selected or not
+ */
+ public FieldVarCharOutput(RowSetLoader writer) {
+ super(writer,
+ TextReader.MAXIMUM_NUMBER_COLUMNS,
+ makeMask(writer));
+ }
+
+ private static boolean[] makeMask(RowSetLoader writer) {
+ final TupleMetadata schema = writer.tupleSchema();
+ final boolean projectionMask[] = new boolean[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ projectionMask[i] = schema.metadata(i).isProjected();
+ }
+ return projectionMask;
+ }
+
+ @Override
+ public boolean endField() {
+ if (fieldProjected) {
+ writer.scalar(currentFieldIndex)
+ .setBytes(fieldBytes, currentDataPointer);
+ }
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
new file mode 100644
index 000000000..62eafc898
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+/**
+ * Text output that implements a header reader/parser.
+ * The caller parses out the characters of each header;
+ * this class assembles UTF-8 bytes into Unicode characters,
+ * fixes invalid characters (those not legal for SQL symbols),
+ * and maps duplicate names to unique names.
+ * <p>
+ * That is, this class is as permissive as possible with file
+ * headers to avoid spurious query failures for trivial reasons.
+ */
+
+// Note: this class uses Java heap strings and the usual Java
+// convenience classes. Since we do heavy Unicode string operations,
+// and read a single row, there is no good reason to try to use
+// value vectors and direct memory for this task.
+
+public class HeaderBuilder extends TextOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class);
+
+ /**
+ * Maximum Drill symbol length, as enforced for headers.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ */
+ // TODO: Replace with the proper constant, if available
+ public static final int MAX_HEADER_LEN = 1024;
+
+ /**
+ * Prefix used to replace non-alphabetic characters at the start of
+ * a column name. For example, $foo becomes col_foo. Used
+ * because SQL does not allow _foo.
+ */
+
+ public static final String COLUMN_PREFIX = "col_";
+
+ /**
+ * Prefix used to create numbered columns for missing
+ * headers. Typical names: column_1, column_2, ...
+ */
+
+ public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
+
+ public final Path filePath;
+ public final List<String> headers = new ArrayList<>();
+ public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+
+ public HeaderBuilder(Path filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void startField(int index) {
+ currentField.clear();
+ }
+
+ @Override
+ public boolean endField() {
+ String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
+ header = validateSymbol(header);
+ headers.add(header);
+ return true;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+
+ // Empty header will be rewritten to "column_<n>".
+
+ return endField();
+ }
+
+ /**
+ * Validate the header name according to the SQL lexical rules.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ * @param header the header name to validate
+ */
+
+ // TODO: Replace with existing code, if any.
+ private String validateSymbol(String header) {
+ header = header.trim();
+
+ // To avoid unnecessary query failures, just make up a column name
+ // if the name is missing or all blanks.
+
+ if (header.isEmpty()) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ }
+ if (! Character.isAlphabetic(header.charAt(0))) {
+ return rewriteHeader(header);
+ }
+ for (int i = 1; i < header.length(); i++) {
+ char ch = header.charAt(i);
+ if (! Character.isAlphabetic(ch) &&
+ ! Character.isDigit(ch) && ch != '_') {
+ return rewriteHeader(header);
+ }
+ }
+ return header;
+ }
+
+ /**
+ * Given an invalid header, rewrite it to replace illegal characters
+ * with valid ones. The header won't be what the user specified,
+ * but it will be a valid SQL identifier. This solution avoids failing
+ * queries due to corrupted or invalid header data.
+ * <p>
+ * Names with invalid first characters are mapped to "col_". Example:
+ * $foo maps to col_foo. If the only character is non-alphabetic, treat
+ * the column as anonymous and create a generic name: column_4, etc.
+ * <p>
+ * This mapping could create a column that exceeds the maximum length
+ * of 1024. Since that is not really a hard limit, we just live with the
+ * extra few characters.
+ *
+ * @param header the original header
+ * @return the rewritten header, valid for SQL
+ */
+
+ private String rewriteHeader(String header) {
+ final StringBuilder buf = new StringBuilder();
+
+ // If starts with non-alphabetic, can't map the character to
+ // underscore, so just tack on a prefix.
+
+ char ch = header.charAt(0);
+ if (Character.isAlphabetic(ch)) {
+ buf.append(ch);
+ } else if (Character.isDigit(ch)) {
+ buf.append(COLUMN_PREFIX);
+ buf.append(ch);
+
+ // For the strange case of only one character, format
+ // the same as an empty header.
+
+ } else if (header.length() == 1) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ } else {
+ buf.append(COLUMN_PREFIX);
+ }
+
+ // Convert all remaining invalid characters to underscores
+
+ for (int i = 1; i < header.length(); i++) {
+ ch = header.charAt(i);
+ if (Character.isAlphabetic(ch) ||
+ Character.isDigit(ch) || ch == '_') {
+ buf.append(ch);
+ } else {
+ buf.append("_");
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void append(byte data) {
+
+ // Ensure the data fits. Note that, if the name is Unicode, the actual
+ // number of characters might be less than the limit even though the
+ // byte count exceeds the limit. Fixing this, in general, would require
+ // a buffer four times larger, so we leave that as a later improvement
+ // if ever needed.
+
+ try {
+ currentField.put(data);
+ } catch (BufferOverflowException e) {
+ throw UserException.dataReadError()
+ .message("Column exceeds maximum length of %d", MAX_HEADER_LEN)
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void finishRecord() {
+ if (headers.isEmpty()) {
+ throw UserException.dataReadError()
+ .message("The file must define at least one header.")
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+
+ // Force headers to be unique.
+
+ final Set<String> idents = new HashSet<String>();
+ for (int i = 0; i < headers.size(); i++) {
+ String header = headers.get(i);
+ String key = header.toLowerCase();
+
+ // Is the header a duplicate?
+
+ if (idents.contains(key)) {
+
+ // Make header unique by appending a suffix.
+ // This loop must end because we have a finite
+ // number of headers.
+ // The original column is assumed to be "1", so
+ // the first duplicate is "2", and so on.
+ // Note that this will map columns of the form:
+ // "col,col,col_2,col_2_2" to
+ // "col", "col_2", "col_2_2", "col_2_2_2".
+ // No mapping scheme is perfect...
+
+ for (int l = 2; ; l++) {
+ final String rewritten = header + "_" + l;
+ key = rewritten.toLowerCase();
+ if (! idents.contains(key)) {
+ headers.set(i, rewritten);
+ break;
+ }
+ }
+ }
+ idents.add(key);
+ }
+ }
+
+ @Override
+ public void startRecord() { }
+
+ public String[] getHeaders() {
+
+ // Just return the headers: any needed checks were done in
+ // finishRecord()
+
+ final String array[] = new String[headers.size()];
+ return headers.toArray(array);
+ }
+
+ // Not used.
+ @Override
+ public long getRecordCount() { return 0; }
+
+ @Override
+ public boolean isFull() { return false; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
new file mode 100644
index 000000000..13b44509e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a single vector of type repeated varchar vector. Each record is a single
+ * value within the vector containing all the fields in the record as individual array elements.
+ */
+public class RepeatedVarCharOutput extends BaseFieldOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+
+ private final ScalarWriter columnWriter;
+ private final ArrayWriter arrayWriter;
+
+ /**
+ * Provide the row set loader (which must have just one repeated Varchar
+ * column) and an optional array projection mask.
+ * @param projectionMask
+ * @param tupleLoader
+ */
+
+ public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) {
+ super(loader,
+ maxField(loader, projectionMask),
+ projectionMask);
+ arrayWriter = writer.array(0);
+ columnWriter = arrayWriter.scalar();
+ }
+
+ private static int maxField(RowSetLoader loader, boolean[] projectionMask) {
+
+ // If the one and only field (`columns`) is not selected, then this
+ // is a COUNT(*) or similar query. Select nothing.
+
+ if (! loader.tupleSchema().metadata(0).isProjected()) {
+ return -1;
+ }
+
+ // If this is SELECT * or SELECT `columns` query, project all
+ // possible fields.
+
+ if (projectionMask == null) {
+ return TextReader.MAXIMUM_NUMBER_COLUMNS;
+ }
+
+ // Else, this is a SELECT columns[x], columns[y], ... query.
+ // Project only the requested element members (fields).
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ return end;
+ }
+
+ /**
+ * Write the value into an array position. Rules:
+ * <ul>
+ * <li>If there is no projection mask, collect all columns.</li>
+ * <li>If a selection mask is present, we previously found the index
+ * of the last projection column (<tt>maxField</tt>). If the current
+ * column is beyond that number, ignore the data and stop accepting
+ * columns.</li>
+ * <li>If the column is projected, add the data to the array.</li>
+ * <li>If the column is not projected, add a blank value to the
+ * array.</li>
+ * </ul>
+ * The above ensures that we leave no holes in the portion of the
+ * array that is projected (by adding blank columns where needed),
+ * and we just ignore columns past the end of the projected part
+ * of the array. (No need to fill holes at the end.)
+ */
+
+ @Override
+ public boolean endField() {
+
+ // Skip the field if past the set of projected fields.
+
+ if (currentFieldIndex > maxField) {
+ return false;
+ }
+
+ // If the field is projected, save it.
+
+ if (fieldProjected) {
+
+ // Repeated var char will create as many entries as there are columns.
+ // If this would exceed the maximum, issue an error. Note that we do
+ // this only if all fields are selected; the same query will succeed if
+ // the user does a COUNT(*) or SELECT columns[x], columns[y], ...
+
+ if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+ throw UserException
+ .unsupportedError()
+ .message("Text file contains too many fields")
+ .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS)
+ .build(logger);
+ }
+
+ // Save the field.
+
+ columnWriter.setBytes(fieldBytes, currentDataPointer);
+ } else {
+
+ // The field is not projected.
+ // Must write a value into this array position, but
+ // the value should be empty.
+
+ columnWriter.setBytes(fieldBytes, 0);
+ }
+
+ // Return whether the rest of the fields should be read.
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
new file mode 100644
index 000000000..70c43b7ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+class StreamFinishedPseudoException extends RuntimeException {
+
+ public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+
+ private StreamFinishedPseudoException() {
+ super("", null, false, true);
+
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
new file mode 100644
index 000000000..26fade6d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+ private final byte[] lineSeparator;
+ private final byte normalizedLineSeparator;
+ private final TextParsingSettingsV3 settings;
+
+ private long lineCount;
+ private long charCount;
+
+ /**
+ * The starting position in the file.
+ */
+ private final long startPos;
+ private final long endPos;
+
+ private long streamPos;
+
+ private final Seekable seekable;
+ private final FSDataInputStream inputFS;
+ private final InputStream input;
+
+ private final DrillBuf buffer;
+ private final ByteBuffer underlyingBuffer;
+ private final long bStart;
+ private final long bStartMinus1;
+
+ private final boolean bufferReadable;
+
+ /**
+ * Whether there was a possible partial line separator on the previous
+ * read so we dropped it and it should be appended to next read.
+ */
+ private int remByte = -1;
+
+ /**
+ * The current position in the buffer.
+ */
+ public int bufferPtr;
+
+ /**
+ * The quantity of valid data in the buffer.
+ */
+ public int length = -1;
+
+ private boolean endFound = false;
+
+ /**
+ * Creates a new instance with the mandatory characters for handling newlines
+ * transparently. lineSeparator the sequence of characters that represent a
+ * newline, as defined in {@link Format#getLineSeparator()}
+ * normalizedLineSeparator the normalized newline character (as defined in
+ * {@link Format#getNormalizedNewline()}) that is used to replace any
+ * lineSeparator sequence found in the input.
+ */
+ public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+ this.lineSeparator = settings.getNewLineDelimiter();
+ byte normalizedLineSeparator = settings.getNormalizedNewLine();
+ Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+ boolean isCompressed = input instanceof CompressionInputStream;
+ Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+ // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
+ if (isCompressed && endPos > 0) {
+ endPos = Long.MAX_VALUE;
+ }
+
+ this.input = input;
+ this.seekable = (Seekable) input;
+ this.settings = settings;
+
+ if (input instanceof FSDataInputStream) {
+ this.inputFS = (FSDataInputStream) input;
+ this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+ } else {
+ this.inputFS = null;
+ this.bufferReadable = false;
+ }
+
+ this.startPos = startPos;
+ this.endPos = endPos;
+
+ this.normalizedLineSeparator = normalizedLineSeparator;
+
+ this.buffer = readBuffer;
+ this.bStart = buffer.memoryAddress();
+ this.bStartMinus1 = bStart -1;
+ this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+ }
+
+ /**
+ * Test the input to position for read start. If the input is a non-zero split or
+ * splitFirstLine is enabled, input will move to appropriate complete line.
+ * @throws IOException for input file read errors
+ */
+ final void start() throws IOException {
+ lineCount = 0;
+ if(startPos > 0){
+ seekable.seek(startPos);
+ }
+
+ updateBuffer();
+ if (length > 0) {
+ if (startPos > 0 || settings.isSkipFirstLine()) {
+
+ // move to next full record.
+ skipLines(1);
+ }
+ }
+ }
+
+
+ /**
+ * Helper method to get the most recent characters consumed since the last record started.
+ * May get an incomplete string since we don't support stream rewind. Returns empty string for now.
+ *
+ * @return String of last few bytes.
+ * @throws IOException for input file read errors
+ */
+ public String getStringSinceMarkForError() throws IOException {
+ return " ";
+ }
+
+ long getPos() {
+ return streamPos + bufferPtr;
+ }
+
+ public void mark() { }
+
+ /**
+ * Read some more bytes from the stream. Uses the zero copy interface if available.
+ * Otherwise, does byte copy.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void read() throws IOException {
+ if (bufferReadable) {
+
+ if (remByte != -1) {
+ for (int i = 0; i <= remByte; i++) {
+ underlyingBuffer.put(lineSeparator[i]);
+ }
+ remByte = -1;
+ }
+ length = inputFS.read(underlyingBuffer);
+
+ } else {
+ byte[] b = new byte[underlyingBuffer.capacity()];
+ if (remByte != -1){
+ int remBytesNum = remByte + 1;
+ System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+ length = input.read(b, remBytesNum, b.length - remBytesNum);
+ remByte = -1;
+ } else {
+ length = input.read(b);
+ }
+ underlyingBuffer.put(b);
+ }
+ }
+
+
+ /**
+ * Read more data into the buffer. Will also manage split end conditions.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void updateBuffer() throws IOException {
+ streamPos = seekable.getPos();
+ underlyingBuffer.clear();
+
+ if (endFound) {
+ length = -1;
+ return;
+ }
+
+ read();
+
+ // check our data read allowance.
+ if (streamPos + length >= this.endPos) {
+ updateLengthBasedOnConstraint();
+ }
+
+ charCount += bufferPtr;
+ bufferPtr = 1;
+
+ buffer.writerIndex(underlyingBuffer.limit());
+ buffer.readerIndex(underlyingBuffer.position());
+ }
+
+ /**
+ * Checks to see if we can go over the end of our bytes constraint on the data. If so,
+ * adjusts so that we can only read to the last character of the first line that crosses
+ * the split boundary.
+ */
+ private void updateLengthBasedOnConstraint() {
+ final long max = bStart + length;
+ for (long m = bStart + (endPos - streamPos); m < max; m++) {
+ for (int i = 0; i < lineSeparator.length; i++) {
+ long mPlus = m + i;
+ if (mPlus < max) {
+ // we found a line separator and don't need to consult the next byte.
+ if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
+ length = (int) (mPlus - bStart) + 1;
+ endFound = true;
+ return;
+ }
+ } else {
+ // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
+ remByte = i;
+ length = length - i;
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get next byte from stream. Also maintains the current line count. Will throw a
+ * {@link StreamFinishedPseudoException} when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextChar() throws IOException {
+ byte byteChar = nextCharNoNewLineCheck();
+ int bufferPtrTemp = bufferPtr - 1;
+ if (byteChar == lineSeparator[0]) {
+ for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+ if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
+ return byteChar;
+ }
+ }
+
+ lineCount++;
+ byteChar = normalizedLineSeparator;
+
+ // we don't need to update buffer position if line separator is one byte long
+ if (lineSeparator.length > 1) {
+ bufferPtr += (lineSeparator.length - 1);
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+ }
+ }
+
+ return byteChar;
+ }
+
+ /**
+ * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException
+ * when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextCharNoNewLineCheck() throws IOException {
+
+ if (length == -1) {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+
+ rangeCheck(buffer, bufferPtr - 1, bufferPtr);
+
+ byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ bufferPtr--;
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+
+ bufferPtr++;
+ return byteChar;
+ }
+
+ /**
+ * Number of lines read since the start of this split.
+ * @return current line count
+ */
+ public final long lineCount() {
+ return lineCount;
+ }
+
+ /**
+ * Skip forward the number of line delimiters. If you are in the middle of a line,
+ * a value of 1 will skip to the start of the next record.
+ *
+ * @param lines Number of lines to skip.
+ * @throws IOException for input file read errors
+ * @throws IllegalArgumentException if unable to skip the requested number
+ * of lines
+ */
+ public final void skipLines(int lines) throws IOException {
+ if (lines < 1) {
+ return;
+ }
+ long expectedLineCount = this.lineCount + lines;
+
+ try {
+ do {
+ nextChar();
+ } while (lineCount < expectedLineCount);
+ if (lineCount < lines) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ } catch (EOFException ex) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ }
+
+ public final long charCount() {
+ return charCount + bufferPtr;
+ }
+
+ public long getLineCount() {
+ return lineCount;
+ }
+
+ public void close() throws IOException{
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
new file mode 100644
index 000000000..48c184991
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+
+/**
+ * Base class for producing output record batches while dealing with
+ * text files. Defines the interface called from text parsers to create
+ * the corresponding value vectors (record batch).
+ */
+
+abstract class TextOutput {
+
+ public abstract void startRecord();
+
+ /**
+ * Start processing a new field within a record.
+ * @param index index within the record
+ */
+ public abstract void startField(int index);
+
+ /**
+ * End processing a field within a record.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endField();
+
+ /**
+ * Shortcut that lets the output know that we are closing ending a field with no data.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endEmptyField();
+
+ /**
+ * Add the provided data but drop any whitespace.
+ * @param data character to append
+ */
+ public void appendIgnoringWhitespace(byte data) {
+ if (TextReader.isWhite(data)) {
+ // noop
+ } else {
+ append(data);
+ }
+ }
+
+ /**
+ * Appends a byte to the output character data buffer
+ * @param data current byte read
+ */
+ public abstract void append(byte data);
+
+ /**
+ * Completes the processing of a given record. Also completes the processing of the
+ * last field being read.
+ */
+ public abstract void finishRecord();
+
+ /**
+ * Return the total number of records (across batches) processed
+ */
+ public abstract long getRecordCount();
+
+ /**
+ * Indicates if the current batch is full and reading for this batch
+ * should stop.
+ *
+ * @return true if the batch is full and the reader must exit to allow
+ * the batch to be sent downstream, false if the reader may continue to
+ * add rows to the current batch
+ */
+
+ public abstract boolean isFull();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
new file mode 100644
index 000000000..86cad4c88
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+
+import com.univocity.parsers.common.ParsingContext;
+
+class TextParsingContext implements ParsingContext {
+
+ private final TextInput input;
+ private final TextOutput output;
+ protected boolean stopped;
+
+ private int[] extractedIndexes;
+
+ public TextParsingContext(TextInput input, TextOutput output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentLine() {
+ return input.lineCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentChar() {
+ return input.charCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int currentColumn() {
+ return -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String[] headers() {
+ return new String[]{};
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int[] extractedFieldIndexes() {
+ return extractedIndexes;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentRecord() {
+ return output.getRecordCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String currentParsedContent() {
+ try {
+ return input.getStringSinceMarkForError();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void skipLines(int lines) {
+ }
+
+ @Override
+ public boolean columnsReordered() {
+ return false;
+ }
+
+ public boolean isFull() {
+ return output.isFull();
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
new file mode 100644
index 000000000..0341b4554
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+// TODO: Remove the "V3" suffix once the V2 version is retired.
+public class TextParsingSettingsV3 {
+
+ public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3();
+
+ private String emptyValue = null;
+ private boolean parseUnescapedQuotes = true;
+ private byte quote = b('"');
+ private byte quoteEscape = b('"');
+ private byte delimiter = b(',');
+ private byte comment = b('#');
+
+ private long maxCharsPerColumn = Character.MAX_VALUE;
+ private byte normalizedNewLine = b('\n');
+ private byte[] newLineDelimiter = {normalizedNewLine};
+ private boolean ignoreLeadingWhitespaces;
+ private boolean ignoreTrailingWhitespaces;
+ private String lineSeparatorString = "\n";
+ private boolean skipFirstLine;
+
+ private boolean headerExtractionEnabled;
+ private boolean useRepeatedVarChar = true;
+
+ public void set(TextFormatConfig config){
+ this.quote = bSafe(config.getQuote(), "quote");
+ this.quoteEscape = bSafe(config.getEscape(), "escape");
+ this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+ this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+ this.comment = bSafe(config.getComment(), "comment");
+ this.skipFirstLine = config.isSkipFirstLine();
+ this.headerExtractionEnabled = config.isHeaderExtractionEnabled();
+ if (this.headerExtractionEnabled) {
+ // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar
+ this.useRepeatedVarChar = false;
+ }
+ }
+
+ public byte getComment() {
+ return comment;
+ }
+
+ public boolean isSkipFirstLine() {
+ return skipFirstLine;
+ }
+
+ public void setSkipFirstLine(boolean skipFirstLine) {
+ this.skipFirstLine = skipFirstLine;
+ }
+
+ public boolean isUseRepeatedVarChar() {
+ return useRepeatedVarChar;
+ }
+
+ public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+ this.useRepeatedVarChar = useRepeatedVarChar;
+ }
+
+ private static byte bSafe(char c, String name) {
+ if (c > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a "
+ + "character between 0 and 127 but value was actually %d.", name, (int) c));
+ }
+ return (byte) c;
+ }
+
+ private static byte b(char c) {
+ return (byte) c;
+ }
+
+ public byte[] getNewLineDelimiter() {
+ return newLineDelimiter;
+ }
+
+ /**
+ * Returns the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @return the quote character
+ */
+ public byte getQuote() {
+ return quote;
+ }
+
+ /**
+ * Defines the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @param quote
+ * the quote character
+ */
+ public void setQuote(byte quote) {
+ this.quote = quote;
+ }
+
+ public String getLineSeparatorString() {
+ return lineSeparatorString;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping values
+ * where the field delimiter is part of the value
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the character used for escaping
+ * values, false otherwise
+ */
+ public boolean isQuote(byte ch) {
+ return this.quote == ch;
+ }
+
+ /**
+ * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+ * @return the quote escape character
+ */
+ public byte getQuoteEscape() {
+ return quoteEscape;
+ }
+
+ /**
+ * Defines the character used for escaping quotes inside an already quoted
+ * value. Defaults to '"'
+ *
+ * @param quoteEscape
+ * the quote escape character
+ */
+ public void setQuoteEscape(byte quoteEscape) {
+ this.quoteEscape = quoteEscape;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping quotes
+ * inside an already quoted value.
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the quote escape character, false
+ * otherwise
+ */
+ public boolean isQuoteEscape(byte ch) {
+ return this.quoteEscape == ch;
+ }
+
+ /**
+ * Returns the field delimiter character. Defaults to ','
+ * @return the field delimiter character
+ */
+ public byte getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * Defines the field delimiter character. Defaults to ','
+ * @param delimiter the field delimiter character
+ */
+ public void setDelimiter(byte delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * Identifies whether or not a given character represents a field delimiter
+ * @param ch the character to be verified
+ * @return true if the given character is the field delimiter character, false otherwise
+ */
+ public boolean isDelimiter(byte ch) {
+ return this.delimiter == ch;
+ }
+
+ /**
+ * Returns the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @return the String representation of an empty value
+ */
+ public String getEmptyValue() {
+ return emptyValue;
+ }
+
+ /**
+ * Sets the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @param emptyValue
+ * the String representation of an empty value
+ */
+ public void setEmptyValue(String emptyValue) {
+ this.emptyValue = emptyValue;
+ }
+
+ /**
+ * Indicates whether the CSV parser should accept unescaped quotes inside
+ * quoted values and parse them normally. Defaults to {@code true}.
+ *
+ * @return a flag indicating whether or not the CSV parser should accept
+ * unescaped quotes inside quoted values.
+ */
+ public boolean isParseUnescapedQuotes() {
+ return parseUnescapedQuotes;
+ }
+
+ /**
+ * Configures how to handle unescaped quotes inside quoted values. If set to
+ * {@code true}, the parser will parse the quote normally as part of the
+ * value. If set the {@code false}, a
+ * {@link com.univocity.parsers.common.TextParsingException} will be thrown.
+ * Defaults to {@code true}.
+ *
+ * @param parseUnescapedQuotes
+ * indicates whether or not the CSV parser should accept unescaped
+ * quotes inside quoted values.
+ */
+ public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+ this.parseUnescapedQuotes = parseUnescapedQuotes;
+ }
+
+ /**
+ * Indicates whether or not the first valid record parsed from the input
+ * should be considered as the row containing the names of each column
+ *
+ * @return true if the first valid record parsed from the input should be
+ * considered as the row containing the names of each column, false
+ * otherwise
+ */
+ public boolean isHeaderExtractionEnabled() {
+ return headerExtractionEnabled;
+ }
+
+ /**
+ * Defines whether or not the first valid record parsed from the input should
+ * be considered as the row containing the names of each column
+ *
+ * @param headerExtractionEnabled
+ * a flag indicating whether the first valid record parsed from the
+ * input should be considered as the row containing the names of each
+ * column
+ */
+ public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+ this.headerExtractionEnabled = headerExtractionEnabled;
+ }
+
+ public long getMaxCharsPerColumn() {
+ return maxCharsPerColumn;
+ }
+
+ public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+ this.maxCharsPerColumn = maxCharsPerColumn;
+ }
+
+ public void setComment(byte comment) {
+ this.comment = comment;
+ }
+
+ public byte getNormalizedNewLine() {
+ return normalizedNewLine;
+ }
+
+ public void setNormalizedNewLine(byte normalizedNewLine) {
+ this.normalizedNewLine = normalizedNewLine;
+ }
+
+ public boolean isIgnoreLeadingWhitespaces() {
+ return ignoreLeadingWhitespaces;
+ }
+
+ public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+ this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+ }
+
+ public boolean isIgnoreTrailingWhitespaces() {
+ return ignoreTrailingWhitespaces;
+ }
+
+ public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+ this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
new file mode 100644
index 000000000..17a076c0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+public final class TextReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+ private static final byte NULL_BYTE = (byte) '\0';
+
+ public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+ private final TextParsingContext context;
+
+ private final TextParsingSettingsV3 settings;
+
+ private final TextInput input;
+ private final TextOutput output;
+ private final DrillBuf workBuf;
+
+ private byte ch;
+
+ // index of the field within this record
+ private int fieldIndex;
+
+ /** Behavior settings **/
+ private final boolean ignoreTrailingWhitespace;
+ private final boolean ignoreLeadingWhitespace;
+ private final boolean parseUnescapedQuotes;
+
+ /** Key Characters **/
+ private final byte comment;
+ private final byte delimiter;
+ private final byte quote;
+ private final byte quoteEscape;
+ private final byte newLine;
+
+ /**
+ * The CsvParser supports all settings provided by {@link TextParsingSettingsV3},
+ * and requires this configuration to be properly initialized.
+ * @param settings the parser configuration
+ * @param input input stream
+ * @param output interface to produce output record batch
+ * @param workBuf working buffer to handle whitespace
+ */
+ public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+ this.context = new TextParsingContext(input, output);
+ this.workBuf = workBuf;
+ this.settings = settings;
+
+ this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+ this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+ this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+ this.delimiter = settings.getDelimiter();
+ this.quote = settings.getQuote();
+ this.quoteEscape = settings.getQuoteEscape();
+ this.newLine = settings.getNormalizedNewLine();
+ this.comment = settings.getComment();
+
+ this.input = input;
+ this.output = output;
+ }
+
+ public TextOutput getOutput() { return output; }
+
+ /**
+ * Check if the given byte is a white space. As per the univocity text reader
+ * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+ * we have an additional check to make sure its not negative
+ */
+ static final boolean isWhite(byte b){
+ return b <= ' ' && b > -1;
+ }
+
+ /**
+ * Inform the output interface to indicate we are starting a new record batch
+ */
+ public void resetForNextBatch() { }
+
+ public long getPos() { return input.getPos(); }
+
+ /**
+ * Function encapsulates parsing an entire record, delegates parsing of the
+ * fields to parseField() function.
+ * We mark the start of the record and if there are any failures encountered (OOM for eg)
+ * then we reset the input stream to the marked position
+ * @return true if parsing this record was successful; false otherwise
+ * @throws IOException for input file read errors
+ */
+ private boolean parseRecord() throws IOException {
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ input.mark();
+
+ fieldIndex = 0;
+ if (ignoreLeadingWhitespace && isWhite(ch)) {
+ skipWhitespace();
+ }
+
+ output.startRecord();
+ int fieldsWritten = 0;
+ try {
+ @SuppressWarnings("unused")
+ boolean earlyTerm = false;
+ while (ch != newLine) {
+ earlyTerm = ! parseField();
+ fieldsWritten++;
+ if (ch != newLine) {
+ ch = input.nextChar();
+ if (ch == newLine) {
+ output.startField(fieldsWritten++);
+ output.endEmptyField();
+ break;
+ }
+ }
+
+ // Disabling early termination. See DRILL-5914
+
+// if (earlyTerm) {
+// if (ch != newLine) {
+// input.skipLines(1);
+// }
+// break;
+// }
+ }
+ output.finishRecord();
+ } catch (StreamFinishedPseudoException e) {
+
+ // if we've written part of a field or all of a field, we should send this row.
+
+ if (fieldsWritten == 0) {
+ throw e;
+ } else {
+ output.finishRecord();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Function parses an individual field and ignores any white spaces encountered
+ * by not appending it to the output vector
+ * @throws IOException for input file read errors
+ */
+ private void parseValueIgnore() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ appendIgnoringWhitespace(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ public void appendIgnoringWhitespace(byte data) {
+ if (! isWhite(data)) {
+ output.append(data);
+ }
+ }
+
+ /**
+ * Function parses an individual field and appends all characters till the delimeter (or newline)
+ * to the output, including white spaces
+ * @throws IOException for input file read errors
+ */
+ private void parseValueAll() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ output.append(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ /**
+ * Function simply delegates the parsing of a single field to the actual
+ * implementation based on parsing config
+ *
+ * @throws IOException
+ * for input file read errors
+ */
+ private void parseValue() throws IOException {
+ if (ignoreTrailingWhitespace) {
+ parseValueIgnore();
+ } else {
+ parseValueAll();
+ }
+ }
+
+ /**
+ * Recursive function invoked when a quote is encountered. Function also
+ * handles the case when there are non-white space characters in the field
+ * after the quoted value.
+ * @param prev previous byte read
+ * @throws IOException for input file read errors
+ */
+ private void parseQuotedValue(byte prev) throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+ final byte quote = this.quote;
+
+ ch = input.nextCharNoNewLineCheck();
+
+ while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+ if (ch != quote) {
+ if (prev == quote) { // unescaped quote detected
+ if (parseUnescapedQuotes) {
+ output.append(quote);
+ output.append(ch);
+ parseQuotedValue(ch);
+ break;
+ } else {
+ throw new TextParsingException(
+ context,
+ "Unescaped quote character '"
+ + quote
+ + "' inside quoted value of CSV field. To allow unescaped quotes, "
+ + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. "
+ + "Cannot parse CSV input.");
+ }
+ }
+ output.append(ch);
+ prev = ch;
+ } else if (prev == quoteEscape) {
+ output.append(quote);
+ prev = NULL_BYTE;
+ } else {
+ prev = ch;
+ }
+ ch = input.nextCharNoNewLineCheck();
+ }
+
+ // Handles whitespace after quoted value:
+ // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ')
+ // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored
+ // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled.
+ if (ch != newLine && ch <= ' ' && ch != delimiter) {
+ final DrillBuf workBuf = this.workBuf;
+ workBuf.resetWriterIndex();
+ do {
+ // saves whitespace after value
+ workBuf.writeByte(ch);
+ ch = input.nextChar();
+ // found a new line, go to next record.
+ if (ch == newLine) {
+ return;
+ }
+ } while (ch <= ' ' && ch != delimiter);
+
+ // there's more stuff after the quoted value, not only empty spaces.
+ if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+ output.append(quote);
+ for(int i =0; i < workBuf.writerIndex(); i++){
+ output.append(workBuf.getByte(i));
+ }
+ // the next character is not the escape character, put it there
+ if (ch != quoteEscape) {
+ output.append(ch);
+ }
+ // sets this character as the previous character (may be escaping)
+ // calls recursively to keep parsing potentially quoted content
+ parseQuotedValue(ch);
+ }
+ }
+
+ if (!(ch == delimiter || ch == newLine)) {
+ throw new TextParsingException(context, "Unexpected character '" + ch
+ + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+ }
+ }
+
+ /**
+ * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+ * @return true if more rows can be read, false if not
+ * @throws IOException for input file read errors
+ */
+ private final boolean parseField() throws IOException {
+
+ output.startField(fieldIndex++);
+
+ if (isWhite(ch) && ignoreLeadingWhitespace) {
+ skipWhitespace();
+ }
+
+ // Delimiter? Then this is an empty field.
+
+ if (ch == delimiter) {
+ return output.endEmptyField();
+ }
+
+ // Have the first character of the field. Parse and save the
+ // field, even if we hit EOF. (An EOF identifies a last line
+ // that contains data, but is not terminated with a newline.)
+
+ try {
+ if (ch == quote) {
+ parseQuotedValue(NULL_BYTE);
+ } else {
+ parseValue();
+ }
+ return output.endField();
+ } catch (StreamFinishedPseudoException e) {
+ return output.endField();
+ }
+ }
+
+ /**
+ * Helper function to skip white spaces occurring at the current input stream.
+ * @throws IOException for input file read errors
+ */
+ private void skipWhitespace() throws IOException {
+ final byte delimiter = this.delimiter;
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ while (isWhite(ch) && ch != delimiter && ch != newLine) {
+ ch = input.nextChar();
+ }
+ }
+
+ /**
+ * Starting point for the reader. Sets up the input interface.
+ * @throws IOException for input file read errors
+ */
+ public final void start() throws IOException {
+ context.stopped = false;
+ input.start();
+ }
+
+ /**
+ * Parses the next record from the input. Will skip the line if its a comment,
+ * this is required when the file contains headers
+ * @throws IOException for input file read errors
+ */
+ public final boolean parseNext() throws IOException {
+ try {
+ while (! context.stopped) {
+ ch = input.nextChar();
+ if (ch == comment) {
+ input.skipLines(1);
+ continue;
+ }
+ break;
+ }
+ final long initialLineNumber = input.lineCount();
+ boolean success = parseRecord();
+ if (initialLineNumber + 1 < input.lineCount()) {
+ throw new TextParsingException(context, "Cannot use newline character within quoted string");
+ }
+
+ return success;
+ } catch (UserException ex) {
+ stopParsing();
+ throw ex;
+ } catch (StreamFinishedPseudoException ex) {
+ stopParsing();
+ return false;
+ } catch (Exception ex) {
+ try {
+ throw handleException(ex);
+ } finally {
+ stopParsing();
+ }
+ }
+ }
+
+ private void stopParsing() { }
+
+ private String displayLineSeparators(String str, boolean addNewLine) {
+ if (addNewLine) {
+ if (str.contains("\r\n")) {
+ str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+ } else if (str.contains("\n")) {
+ str = str.replaceAll("\\n", "[\\\\n]\n\t");
+ } else {
+ str = str.replaceAll("\\r", "[\\\\r]\r\t");
+ }
+ } else {
+ str = str.replaceAll("\\n", "\\\\n");
+ str = str.replaceAll("\\r", "\\\\r");
+ }
+ return str;
+ }
+
+ /**
+ * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+ * the exception.
+ * @param ex Exception raised
+ * @throws IOException for input file read errors
+ */
+ private TextParsingException handleException(Exception ex) throws IOException {
+
+ if (ex instanceof TextParsingException) {
+ throw (TextParsingException) ex;
+ }
+
+ String message = null;
+ String tmp = input.getStringSinceMarkForError();
+ char[] chars = tmp.toCharArray();
+ if (chars != null) {
+ int length = chars.length;
+ if (length > settings.getMaxCharsPerColumn()) {
+ message = "Length of parsed input (" + length
+ + ") exceeds the maximum number of characters defined in your parser settings ("
+ + settings.getMaxCharsPerColumn() + "). ";
+ }
+
+ if (tmp.contains("\n") || tmp.contains("\r")) {
+ tmp = displayLineSeparators(tmp, true);
+ String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+ message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+ + lineSeparator + "'. Parsed content:\n\t" + tmp;
+ }
+
+ int nullCharacterCount = 0;
+ // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+ int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+ StringBuilder s = new StringBuilder(maxLength);
+ for (int i = 0; i < maxLength; i++) {
+ if (chars[i] == '\0') {
+ s.append('\\');
+ s.append('0');
+ nullCharacterCount++;
+ } else {
+ s.append(chars[i]);
+ }
+ }
+ tmp = s.toString();
+
+ if (nullCharacterCount > 0) {
+ message += "\nIdentified "
+ + nullCharacterCount
+ + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+ + tmp;
+ }
+ }
+
+ UserException.Builder builder;
+ if (ex instanceof UserException) {
+ builder = ((UserException) ex).rebuild();
+ } else {
+ builder = UserException
+ .dataReadError(ex)
+ .message(message);
+ }
+ throw builder
+ .addContext("Line", context.currentLine())
+ .addContext("Record", context.currentRecord())
+ .build(logger);
+ }
+
+ /**
+ * Finish the processing of a batch, indicates to the output
+ * interface to wrap up the batch
+ */
+ public void finishBatch() { }
+
+ /**
+ * Invoked once there are no more records and we are done with the
+ * current record reader to clean up state.
+ * @throws IOException for input file read errors
+ */
+ public void close() throws IOException {
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
new file mode 100644
index 000000000..aced5adfd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Version 3 of the text reader. Hosts the "compliant" text reader on
+ * the row set framework.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;