diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant')
13 files changed, 2373 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java new file mode 100644 index 000000000..0ed3155a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Original version of the "compliant" text reader. This is version 2 of + * the text reader. This version is retained for temporary backward + * compatibility as we productize the newer version 3 based on the + * row set framework. + * <p> + * TODO: Remove the files in this package and move the files from the + * "v3" sub-package here once the version 3 implementation stabilizes. + */ +package org.apache.drill.exec.store.easy.text.compliant; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java new file mode 100644 index 000000000..6bf0bb69c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; + +public abstract class BaseFieldOutput extends TextOutput { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + private static final int MAX_FIELD_LENGTH = 1024 * 64; + + // track which field is getting appended + protected int currentFieldIndex = -1; + // track chars within field + protected int currentDataPointer; + // track if field is still getting appended + private boolean fieldOpen = true; + // holds chars for a field + protected byte[] fieldBytes; + protected final RowSetLoader writer; + private final boolean[] projectionMask; + protected final int maxField; + protected boolean fieldProjected; + + /** + * Initialize the field output for one of three scenarios: + * <ul> + * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1 + * max fields.</li> + * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field + * of -1.</li> + * <li>SELECT a, b, c indicated by a non-null projection mask that + * identifies the indexes of the fields to be selected. In this case, + * this constructor computes the maximum field.</li> + * </ul> + * + * @param writer Row set writer that provides access to the writer for + * each column + * @param maxField the index of the last field to store. May be -1 if no + * fields are to be stored. Computed if the projection mask is set + * @param projectionMask a boolean array indicating which fields are + * to be projected to the output. Optional + */ + + public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) { + this.writer = writer; + this.projectionMask = projectionMask; + + // If no projection mask is defined, then we want all columns + // up to the max field, which may be -1 if we want to select + // nothing. + + if (projectionMask == null) { + this.maxField = maxField; + } else { + + // Otherwise, use the projection mask to determine + // which fields are to be projected. (The file may well + // contain more than the projected set.) + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + this.maxField = end; + } + + // If we project at least one field, allocate a buffer. + + if (maxField >= 0) { + fieldBytes = new byte[MAX_FIELD_LENGTH]; + } + } + + /** + * Start a new record record. Resets all pointers + */ + + @Override + public void startRecord() { + currentFieldIndex = -1; + fieldOpen = false; + writer.start(); + } + + @Override + public void startField(int index) { + assert index == currentFieldIndex + 1; + currentFieldIndex = index; + currentDataPointer = 0; + fieldOpen = true; + + // Figure out if this field is projected. + + if (projectionMask == null) { + fieldProjected = currentFieldIndex <= maxField; + } else if (currentFieldIndex >= projectionMask.length) { + fieldProjected = false; + } else { + fieldProjected = projectionMask[currentFieldIndex]; + } + } + + @Override + public void append(byte data) { + if (! fieldProjected) { + return; + } + if (currentDataPointer >= MAX_FIELD_LENGTH - 1) { + throw UserException + .unsupportedError() + .message("Text column is too large.") + .addContext("Column", currentFieldIndex) + .addContext("Limit", MAX_FIELD_LENGTH) + .build(logger); + } + + fieldBytes[currentDataPointer++] = data; + } + + @Override + public boolean endField() { + fieldOpen = false; + return currentFieldIndex < maxField; + } + + @Override + public boolean endEmptyField() { + return endField(); + } + + @Override + public void finishRecord() { + if (fieldOpen) { + endField(); + } + writer.save(); + } + + @Override + public long getRecordCount() { + return writer.rowCount(); + } + + @Override + public boolean isFull() { + return writer.isFull(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java new file mode 100644 index 000000000..e489003c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.hadoop.mapred.FileSplit; + +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; + +/** + * New text reader, complies with the RFC 4180 standard for text/csv files + */ +public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class); + + private static final int MAX_RECORDS_PER_BATCH = 8096; + private static final int READ_BUFFER = 1024 * 1024; + private static final int WHITE_SPACE_BUFFER = 64 * 1024; + + // settings to be used while parsing + private final TextParsingSettingsV3 settings; + // Chunk of the file to be read by this reader + private final FileSplit split; + // text reader implementation + private TextReader reader; + // input buffer + private DrillBuf readBuffer; + // working buffer to handle whitespaces + private DrillBuf whitespaceBuffer; + private final DrillFileSystem dfs; + + private RowSetLoader writer; + + public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) { + this.split = split; + this.settings = settings; + this.dfs = dfs; + + // Validate. Otherwise, these problems show up later as a data + // read error which is very confusing. + + if (settings.getNewLineDelimiter().length == 0) { + throw UserException + .validationError() + .message("The text format line delimiter cannot be blank.") + .build(logger); + } + } + + /** + * Performs the initial setup required for the record reader. + * Initializes the input stream, handling of the output record batch + * and the actual reader to be used. + * @param context operator context from which buffer's will be allocated and managed + * @param outputMutator Used to create the schema in the output record batch + */ + + @Override + public boolean open(ColumnsSchemaNegotiator schemaNegotiator) { + final OperatorContext context = schemaNegotiator.context(); + + // Note: DO NOT use managed buffers here. They remain in existence + // until the fragment is shut down. The buffers here are large. + // If we scan 1000 files, and allocate 1 MB for each, we end up + // holding onto 1 GB of memory in managed buffers. + // Instead, we allocate the buffers explicitly, and must free + // them. + + readBuffer = context.getAllocator().buffer(READ_BUFFER); + whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER); + + // TODO: Set this based on size of record rather than + // absolute count. + + schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH); + + // setup Output, Input, and Reader + try { + TextOutput output; + + if (settings.isHeaderExtractionEnabled()) { + output = openWithHeaders(schemaNegotiator); + } else { + output = openWithoutHeaders(schemaNegotiator); + } + if (output == null) { + return false; + } + openReader(output); + return true; + } catch (final IOException e) { + throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger); + } + } + + /** + * Extract header and use that to define the reader schema. + */ + + private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException { + final String [] fieldNames = extractHeader(); + if (fieldNames == null) { + return null; + } + final TupleMetadata schema = new TupleSchema(); + for (final String colName : fieldNames) { + schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED)); + } + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new FieldVarCharOutput(writer); + } + + /** + * When no headers, create a single array column "columns". + */ + + private TextOutput openWithoutHeaders( + ColumnsSchemaNegotiator schemaNegotiator) { + final TupleMetadata schema = new TupleSchema(); + schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED)); + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes()); + } + + private void openReader(TextOutput output) throws IOException { + logger.trace("Opening file {}", split.getPath()); + final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput input = new TextInput(settings, stream, readBuffer, + split.getStart(), split.getStart() + split.getLength()); + + // setup Reader using Input and Output + reader = new TextReader(settings, input, output, whitespaceBuffer); + reader.start(); + } + + /** + * Extracts header from text file. + * Currently it is assumed to be first line if headerExtractionEnabled is set to true + * TODO: enhance to support more common header patterns + * @return field name strings + */ + + private String [] extractHeader() throws IOException { + assert settings.isHeaderExtractionEnabled(); + + // don't skip header in case skipFirstLine is set true + settings.setSkipFirstLine(false); + + final HeaderBuilder hOutput = new HeaderBuilder(split.getPath()); + + // setup Input using InputStream + // we should read file header irrespective of split given given to this reader + final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength()); + + // setup Reader using Input and Output + this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer); + reader.start(); + + // extract first row only + reader.parseNext(); + + // grab the field names from output + final String [] fieldNames = hOutput.getHeaders(); + + // cleanup and set to skip the first line next time we read input + reader.close(); + settings.setSkipFirstLine(true); + + readBuffer.clear(); + whitespaceBuffer.clear(); + return fieldNames; + } + + /** + * Generates the next record batch + * @return number of records in the batch + */ + + @Override + public boolean next() { + reader.resetForNextBatch(); + + try { + boolean more = false; + while (! writer.isFull()) { + more = reader.parseNext(); + if (! more) { + break; + } + } + reader.finishBatch(); + + // Return false on the batch that hits EOF. The scan operator + // knows to process any rows in this final batch. + + return more && writer.rowCount() > 0; + } catch (IOException | TextParsingException e) { + if (e.getCause() != null && e.getCause() instanceof UserException) { + throw (UserException) e.getCause(); + } + throw UserException.dataReadError(e) + .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.", + split.getPath(), reader.getPos()) + .build(logger); + } + } + + /** + * Cleanup state once we are finished processing all the records. + * This would internally close the input stream we are reading from. + */ + @Override + public void close() { + + // Release the buffers allocated above. Double-check to handle + // unexpected multiple calls to close(). + + if (readBuffer != null) { + readBuffer.release(); + readBuffer = null; + } + if (whitespaceBuffer != null) { + whitespaceBuffer.release(); + whitespaceBuffer = null; + } + try { + if (reader != null) { + reader.close(); + reader = null; + } + } catch (final IOException e) { + logger.warn("Exception while closing stream.", e); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java new file mode 100644 index 000000000..df48a5548 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a set of varchar vectors. A varchar vector contains all the field + * values for a given column. Each record is a single value within each vector of the set. + */ +class FieldVarCharOutput extends BaseFieldOutput { + + /** + * We initialize and add the varchar vector for each incoming field in this + * constructor. + * @param outputMutator Used to create/modify schema + * @param fieldNames Incoming field names + * @param columns List of columns selected in the query + * @param isStarQuery boolean to indicate if all fields are selected or not + */ + public FieldVarCharOutput(RowSetLoader writer) { + super(writer, + TextReader.MAXIMUM_NUMBER_COLUMNS, + makeMask(writer)); + } + + private static boolean[] makeMask(RowSetLoader writer) { + final TupleMetadata schema = writer.tupleSchema(); + final boolean projectionMask[] = new boolean[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + projectionMask[i] = schema.metadata(i).isProjected(); + } + return projectionMask; + } + + @Override + public boolean endField() { + if (fieldProjected) { + writer.scalar(currentFieldIndex) + .setBytes(fieldBytes, currentDataPointer); + } + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java new file mode 100644 index 000000000..62eafc898 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.fs.Path; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +/** + * Text output that implements a header reader/parser. + * The caller parses out the characters of each header; + * this class assembles UTF-8 bytes into Unicode characters, + * fixes invalid characters (those not legal for SQL symbols), + * and maps duplicate names to unique names. + * <p> + * That is, this class is as permissive as possible with file + * headers to avoid spurious query failures for trivial reasons. + */ + +// Note: this class uses Java heap strings and the usual Java +// convenience classes. Since we do heavy Unicode string operations, +// and read a single row, there is no good reason to try to use +// value vectors and direct memory for this task. + +public class HeaderBuilder extends TextOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class); + + /** + * Maximum Drill symbol length, as enforced for headers. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + */ + // TODO: Replace with the proper constant, if available + public static final int MAX_HEADER_LEN = 1024; + + /** + * Prefix used to replace non-alphabetic characters at the start of + * a column name. For example, $foo becomes col_foo. Used + * because SQL does not allow _foo. + */ + + public static final String COLUMN_PREFIX = "col_"; + + /** + * Prefix used to create numbered columns for missing + * headers. Typical names: column_1, column_2, ... + */ + + public static final String ANONYMOUS_COLUMN_PREFIX = "column_"; + + public final Path filePath; + public final List<String> headers = new ArrayList<>(); + public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); + + public HeaderBuilder(Path filePath) { + this.filePath = filePath; + } + + @Override + public void startField(int index) { + currentField.clear(); + } + + @Override + public boolean endField() { + String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8); + header = validateSymbol(header); + headers.add(header); + return true; + } + + @Override + public boolean endEmptyField() { + + // Empty header will be rewritten to "column_<n>". + + return endField(); + } + + /** + * Validate the header name according to the SQL lexical rules. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + * @param header the header name to validate + */ + + // TODO: Replace with existing code, if any. + private String validateSymbol(String header) { + header = header.trim(); + + // To avoid unnecessary query failures, just make up a column name + // if the name is missing or all blanks. + + if (header.isEmpty()) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } + if (! Character.isAlphabetic(header.charAt(0))) { + return rewriteHeader(header); + } + for (int i = 1; i < header.length(); i++) { + char ch = header.charAt(i); + if (! Character.isAlphabetic(ch) && + ! Character.isDigit(ch) && ch != '_') { + return rewriteHeader(header); + } + } + return header; + } + + /** + * Given an invalid header, rewrite it to replace illegal characters + * with valid ones. The header won't be what the user specified, + * but it will be a valid SQL identifier. This solution avoids failing + * queries due to corrupted or invalid header data. + * <p> + * Names with invalid first characters are mapped to "col_". Example: + * $foo maps to col_foo. If the only character is non-alphabetic, treat + * the column as anonymous and create a generic name: column_4, etc. + * <p> + * This mapping could create a column that exceeds the maximum length + * of 1024. Since that is not really a hard limit, we just live with the + * extra few characters. + * + * @param header the original header + * @return the rewritten header, valid for SQL + */ + + private String rewriteHeader(String header) { + final StringBuilder buf = new StringBuilder(); + + // If starts with non-alphabetic, can't map the character to + // underscore, so just tack on a prefix. + + char ch = header.charAt(0); + if (Character.isAlphabetic(ch)) { + buf.append(ch); + } else if (Character.isDigit(ch)) { + buf.append(COLUMN_PREFIX); + buf.append(ch); + + // For the strange case of only one character, format + // the same as an empty header. + + } else if (header.length() == 1) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } else { + buf.append(COLUMN_PREFIX); + } + + // Convert all remaining invalid characters to underscores + + for (int i = 1; i < header.length(); i++) { + ch = header.charAt(i); + if (Character.isAlphabetic(ch) || + Character.isDigit(ch) || ch == '_') { + buf.append(ch); + } else { + buf.append("_"); + } + } + return buf.toString(); + } + + @Override + public void append(byte data) { + + // Ensure the data fits. Note that, if the name is Unicode, the actual + // number of characters might be less than the limit even though the + // byte count exceeds the limit. Fixing this, in general, would require + // a buffer four times larger, so we leave that as a later improvement + // if ever needed. + + try { + currentField.put(data); + } catch (BufferOverflowException e) { + throw UserException.dataReadError() + .message("Column exceeds maximum length of %d", MAX_HEADER_LEN) + .addContext("File Path", filePath.toString()) + .build(logger); + } + } + + @Override + public void finishRecord() { + if (headers.isEmpty()) { + throw UserException.dataReadError() + .message("The file must define at least one header.") + .addContext("File Path", filePath.toString()) + .build(logger); + } + + // Force headers to be unique. + + final Set<String> idents = new HashSet<String>(); + for (int i = 0; i < headers.size(); i++) { + String header = headers.get(i); + String key = header.toLowerCase(); + + // Is the header a duplicate? + + if (idents.contains(key)) { + + // Make header unique by appending a suffix. + // This loop must end because we have a finite + // number of headers. + // The original column is assumed to be "1", so + // the first duplicate is "2", and so on. + // Note that this will map columns of the form: + // "col,col,col_2,col_2_2" to + // "col", "col_2", "col_2_2", "col_2_2_2". + // No mapping scheme is perfect... + + for (int l = 2; ; l++) { + final String rewritten = header + "_" + l; + key = rewritten.toLowerCase(); + if (! idents.contains(key)) { + headers.set(i, rewritten); + break; + } + } + } + idents.add(key); + } + } + + @Override + public void startRecord() { } + + public String[] getHeaders() { + + // Just return the headers: any needed checks were done in + // finishRecord() + + final String array[] = new String[headers.size()]; + return headers.toArray(array); + } + + // Not used. + @Override + public long getRecordCount() { return 0; } + + @Override + public boolean isFull() { return false; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java new file mode 100644 index 000000000..13b44509e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a single vector of type repeated varchar vector. Each record is a single + * value within the vector containing all the fields in the record as individual array elements. + */ +public class RepeatedVarCharOutput extends BaseFieldOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + + private final ScalarWriter columnWriter; + private final ArrayWriter arrayWriter; + + /** + * Provide the row set loader (which must have just one repeated Varchar + * column) and an optional array projection mask. + * @param projectionMask + * @param tupleLoader + */ + + public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) { + super(loader, + maxField(loader, projectionMask), + projectionMask); + arrayWriter = writer.array(0); + columnWriter = arrayWriter.scalar(); + } + + private static int maxField(RowSetLoader loader, boolean[] projectionMask) { + + // If the one and only field (`columns`) is not selected, then this + // is a COUNT(*) or similar query. Select nothing. + + if (! loader.tupleSchema().metadata(0).isProjected()) { + return -1; + } + + // If this is SELECT * or SELECT `columns` query, project all + // possible fields. + + if (projectionMask == null) { + return TextReader.MAXIMUM_NUMBER_COLUMNS; + } + + // Else, this is a SELECT columns[x], columns[y], ... query. + // Project only the requested element members (fields). + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + return end; + } + + /** + * Write the value into an array position. Rules: + * <ul> + * <li>If there is no projection mask, collect all columns.</li> + * <li>If a selection mask is present, we previously found the index + * of the last projection column (<tt>maxField</tt>). If the current + * column is beyond that number, ignore the data and stop accepting + * columns.</li> + * <li>If the column is projected, add the data to the array.</li> + * <li>If the column is not projected, add a blank value to the + * array.</li> + * </ul> + * The above ensures that we leave no holes in the portion of the + * array that is projected (by adding blank columns where needed), + * and we just ignore columns past the end of the projected part + * of the array. (No need to fill holes at the end.) + */ + + @Override + public boolean endField() { + + // Skip the field if past the set of projected fields. + + if (currentFieldIndex > maxField) { + return false; + } + + // If the field is projected, save it. + + if (fieldProjected) { + + // Repeated var char will create as many entries as there are columns. + // If this would exceed the maximum, issue an error. Note that we do + // this only if all fields are selected; the same query will succeed if + // the user does a COUNT(*) or SELECT columns[x], columns[y], ... + + if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { + throw UserException + .unsupportedError() + .message("Text file contains too many fields") + .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS) + .build(logger); + } + + // Save the field. + + columnWriter.setBytes(fieldBytes, currentDataPointer); + } else { + + // The field is not projected. + // Must write a value into this array position, but + // the value should be empty. + + columnWriter.setBytes(fieldBytes, 0); + } + + // Return whether the rest of the fields should be read. + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java new file mode 100644 index 000000000..70c43b7ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +class StreamFinishedPseudoException extends RuntimeException { + + public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException(); + + private StreamFinishedPseudoException() { + super("", null, false, true); + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java new file mode 100644 index 000000000..26fade6d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; +import io.netty.util.internal.PlatformDependent; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + +/** + * Class that fronts an InputStream to provide a byte consumption interface. + * Also manages only reading lines to and from each split. + */ +final class TextInput { + + private final byte[] lineSeparator; + private final byte normalizedLineSeparator; + private final TextParsingSettingsV3 settings; + + private long lineCount; + private long charCount; + + /** + * The starting position in the file. + */ + private final long startPos; + private final long endPos; + + private long streamPos; + + private final Seekable seekable; + private final FSDataInputStream inputFS; + private final InputStream input; + + private final DrillBuf buffer; + private final ByteBuffer underlyingBuffer; + private final long bStart; + private final long bStartMinus1; + + private final boolean bufferReadable; + + /** + * Whether there was a possible partial line separator on the previous + * read so we dropped it and it should be appended to next read. + */ + private int remByte = -1; + + /** + * The current position in the buffer. + */ + public int bufferPtr; + + /** + * The quantity of valid data in the buffer. + */ + public int length = -1; + + private boolean endFound = false; + + /** + * Creates a new instance with the mandatory characters for handling newlines + * transparently. lineSeparator the sequence of characters that represent a + * newline, as defined in {@link Format#getLineSeparator()} + * normalizedLineSeparator the normalized newline character (as defined in + * {@link Format#getNormalizedNewline()}) that is used to replace any + * lineSeparator sequence found in the input. + */ + public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { + this.lineSeparator = settings.getNewLineDelimiter(); + byte normalizedLineSeparator = settings.getNormalizedNewLine(); + Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); + boolean isCompressed = input instanceof CompressionInputStream; + Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); + + // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely. + if (isCompressed && endPos > 0) { + endPos = Long.MAX_VALUE; + } + + this.input = input; + this.seekable = (Seekable) input; + this.settings = settings; + + if (input instanceof FSDataInputStream) { + this.inputFS = (FSDataInputStream) input; + this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable; + } else { + this.inputFS = null; + this.bufferReadable = false; + } + + this.startPos = startPos; + this.endPos = endPos; + + this.normalizedLineSeparator = normalizedLineSeparator; + + this.buffer = readBuffer; + this.bStart = buffer.memoryAddress(); + this.bStartMinus1 = bStart -1; + this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity()); + } + + /** + * Test the input to position for read start. If the input is a non-zero split or + * splitFirstLine is enabled, input will move to appropriate complete line. + * @throws IOException for input file read errors + */ + final void start() throws IOException { + lineCount = 0; + if(startPos > 0){ + seekable.seek(startPos); + } + + updateBuffer(); + if (length > 0) { + if (startPos > 0 || settings.isSkipFirstLine()) { + + // move to next full record. + skipLines(1); + } + } + } + + + /** + * Helper method to get the most recent characters consumed since the last record started. + * May get an incomplete string since we don't support stream rewind. Returns empty string for now. + * + * @return String of last few bytes. + * @throws IOException for input file read errors + */ + public String getStringSinceMarkForError() throws IOException { + return " "; + } + + long getPos() { + return streamPos + bufferPtr; + } + + public void mark() { } + + /** + * Read some more bytes from the stream. Uses the zero copy interface if available. + * Otherwise, does byte copy. + * + * @throws IOException for input file read errors + */ + private void read() throws IOException { + if (bufferReadable) { + + if (remByte != -1) { + for (int i = 0; i <= remByte; i++) { + underlyingBuffer.put(lineSeparator[i]); + } + remByte = -1; + } + length = inputFS.read(underlyingBuffer); + + } else { + byte[] b = new byte[underlyingBuffer.capacity()]; + if (remByte != -1){ + int remBytesNum = remByte + 1; + System.arraycopy(lineSeparator, 0, b, 0, remBytesNum); + length = input.read(b, remBytesNum, b.length - remBytesNum); + remByte = -1; + } else { + length = input.read(b); + } + underlyingBuffer.put(b); + } + } + + + /** + * Read more data into the buffer. Will also manage split end conditions. + * + * @throws IOException for input file read errors + */ + private void updateBuffer() throws IOException { + streamPos = seekable.getPos(); + underlyingBuffer.clear(); + + if (endFound) { + length = -1; + return; + } + + read(); + + // check our data read allowance. + if (streamPos + length >= this.endPos) { + updateLengthBasedOnConstraint(); + } + + charCount += bufferPtr; + bufferPtr = 1; + + buffer.writerIndex(underlyingBuffer.limit()); + buffer.readerIndex(underlyingBuffer.position()); + } + + /** + * Checks to see if we can go over the end of our bytes constraint on the data. If so, + * adjusts so that we can only read to the last character of the first line that crosses + * the split boundary. + */ + private void updateLengthBasedOnConstraint() { + final long max = bStart + length; + for (long m = bStart + (endPos - streamPos); m < max; m++) { + for (int i = 0; i < lineSeparator.length; i++) { + long mPlus = m + i; + if (mPlus < max) { + // we found a line separator and don't need to consult the next byte. + if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) { + length = (int) (mPlus - bStart) + 1; + endFound = true; + return; + } + } else { + // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read. + remByte = i; + length = length - i; + return; + } + } + } + } + + /** + * Get next byte from stream. Also maintains the current line count. Will throw a + * {@link StreamFinishedPseudoException} when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextChar() throws IOException { + byte byteChar = nextCharNoNewLineCheck(); + int bufferPtrTemp = bufferPtr - 1; + if (byteChar == lineSeparator[0]) { + for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) { + if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) { + return byteChar; + } + } + + lineCount++; + byteChar = normalizedLineSeparator; + + // we don't need to update buffer position if line separator is one byte long + if (lineSeparator.length > 1) { + bufferPtr += (lineSeparator.length - 1); + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + } + } + + return byteChar; + } + + /** + * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException + * when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextCharNoNewLineCheck() throws IOException { + + if (length == -1) { + throw StreamFinishedPseudoException.INSTANCE; + } + + rangeCheck(buffer, bufferPtr - 1, bufferPtr); + + byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr); + + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + bufferPtr--; + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + + bufferPtr++; + return byteChar; + } + + /** + * Number of lines read since the start of this split. + * @return current line count + */ + public final long lineCount() { + return lineCount; + } + + /** + * Skip forward the number of line delimiters. If you are in the middle of a line, + * a value of 1 will skip to the start of the next record. + * + * @param lines Number of lines to skip. + * @throws IOException for input file read errors + * @throws IllegalArgumentException if unable to skip the requested number + * of lines + */ + public final void skipLines(int lines) throws IOException { + if (lines < 1) { + return; + } + long expectedLineCount = this.lineCount + lines; + + try { + do { + nextChar(); + } while (lineCount < expectedLineCount); + if (lineCount < lines) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } catch (EOFException ex) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } + + public final long charCount() { + return charCount + bufferPtr; + } + + public long getLineCount() { + return lineCount; + } + + public void close() throws IOException{ + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java new file mode 100644 index 000000000..48c184991 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + + +/** + * Base class for producing output record batches while dealing with + * text files. Defines the interface called from text parsers to create + * the corresponding value vectors (record batch). + */ + +abstract class TextOutput { + + public abstract void startRecord(); + + /** + * Start processing a new field within a record. + * @param index index within the record + */ + public abstract void startField(int index); + + /** + * End processing a field within a record. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endField(); + + /** + * Shortcut that lets the output know that we are closing ending a field with no data. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endEmptyField(); + + /** + * Add the provided data but drop any whitespace. + * @param data character to append + */ + public void appendIgnoringWhitespace(byte data) { + if (TextReader.isWhite(data)) { + // noop + } else { + append(data); + } + } + + /** + * Appends a byte to the output character data buffer + * @param data current byte read + */ + public abstract void append(byte data); + + /** + * Completes the processing of a given record. Also completes the processing of the + * last field being read. + */ + public abstract void finishRecord(); + + /** + * Return the total number of records (across batches) processed + */ + public abstract long getRecordCount(); + + /** + * Indicates if the current batch is full and reading for this batch + * should stop. + * + * @return true if the batch is full and the reader must exit to allow + * the batch to be sent downstream, false if the reader may continue to + * add rows to the current batch + */ + + public abstract boolean isFull(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java new file mode 100644 index 000000000..86cad4c88 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; + +import com.univocity.parsers.common.ParsingContext; + +class TextParsingContext implements ParsingContext { + + private final TextInput input; + private final TextOutput output; + protected boolean stopped; + + private int[] extractedIndexes; + + public TextParsingContext(TextInput input, TextOutput output) { + this.input = input; + this.output = output; + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() { + stopped = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isStopped() { + return stopped; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentLine() { + return input.lineCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public long currentChar() { + return input.charCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public int currentColumn() { + return -1; + } + + /** + * {@inheritDoc} + */ + @Override + public String[] headers() { + return new String[]{}; + } + + /** + * {@inheritDoc} + */ + @Override + public int[] extractedFieldIndexes() { + return extractedIndexes; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentRecord() { + return output.getRecordCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public String currentParsedContent() { + try { + return input.getStringSinceMarkForError(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void skipLines(int lines) { + } + + @Override + public boolean columnsReordered() { + return false; + } + + public boolean isFull() { + return output.isFull(); + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java new file mode 100644 index 000000000..0341b4554 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +// TODO: Remove the "V3" suffix once the V2 version is retired. +public class TextParsingSettingsV3 { + + public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3(); + + private String emptyValue = null; + private boolean parseUnescapedQuotes = true; + private byte quote = b('"'); + private byte quoteEscape = b('"'); + private byte delimiter = b(','); + private byte comment = b('#'); + + private long maxCharsPerColumn = Character.MAX_VALUE; + private byte normalizedNewLine = b('\n'); + private byte[] newLineDelimiter = {normalizedNewLine}; + private boolean ignoreLeadingWhitespaces; + private boolean ignoreTrailingWhitespaces; + private String lineSeparatorString = "\n"; + private boolean skipFirstLine; + + private boolean headerExtractionEnabled; + private boolean useRepeatedVarChar = true; + + public void set(TextFormatConfig config){ + this.quote = bSafe(config.getQuote(), "quote"); + this.quoteEscape = bSafe(config.getEscape(), "escape"); + this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8); + this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter"); + this.comment = bSafe(config.getComment(), "comment"); + this.skipFirstLine = config.isSkipFirstLine(); + this.headerExtractionEnabled = config.isHeaderExtractionEnabled(); + if (this.headerExtractionEnabled) { + // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar + this.useRepeatedVarChar = false; + } + } + + public byte getComment() { + return comment; + } + + public boolean isSkipFirstLine() { + return skipFirstLine; + } + + public void setSkipFirstLine(boolean skipFirstLine) { + this.skipFirstLine = skipFirstLine; + } + + public boolean isUseRepeatedVarChar() { + return useRepeatedVarChar; + } + + public void setUseRepeatedVarChar(boolean useRepeatedVarChar) { + this.useRepeatedVarChar = useRepeatedVarChar; + } + + private static byte bSafe(char c, String name) { + if (c > Byte.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a " + + "character between 0 and 127 but value was actually %d.", name, (int) c)); + } + return (byte) c; + } + + private static byte b(char c) { + return (byte) c; + } + + public byte[] getNewLineDelimiter() { + return newLineDelimiter; + } + + /** + * Returns the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @return the quote character + */ + public byte getQuote() { + return quote; + } + + /** + * Defines the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @param quote + * the quote character + */ + public void setQuote(byte quote) { + this.quote = quote; + } + + public String getLineSeparatorString() { + return lineSeparatorString; + } + + /** + * Identifies whether or not a given character is used for escaping values + * where the field delimiter is part of the value + * + * @param ch + * the character to be verified + * @return true if the given character is the character used for escaping + * values, false otherwise + */ + public boolean isQuote(byte ch) { + return this.quote == ch; + } + + /** + * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"' + * @return the quote escape character + */ + public byte getQuoteEscape() { + return quoteEscape; + } + + /** + * Defines the character used for escaping quotes inside an already quoted + * value. Defaults to '"' + * + * @param quoteEscape + * the quote escape character + */ + public void setQuoteEscape(byte quoteEscape) { + this.quoteEscape = quoteEscape; + } + + /** + * Identifies whether or not a given character is used for escaping quotes + * inside an already quoted value. + * + * @param ch + * the character to be verified + * @return true if the given character is the quote escape character, false + * otherwise + */ + public boolean isQuoteEscape(byte ch) { + return this.quoteEscape == ch; + } + + /** + * Returns the field delimiter character. Defaults to ',' + * @return the field delimiter character + */ + public byte getDelimiter() { + return delimiter; + } + + /** + * Defines the field delimiter character. Defaults to ',' + * @param delimiter the field delimiter character + */ + public void setDelimiter(byte delimiter) { + this.delimiter = delimiter; + } + + /** + * Identifies whether or not a given character represents a field delimiter + * @param ch the character to be verified + * @return true if the given character is the field delimiter character, false otherwise + */ + public boolean isDelimiter(byte ch) { + return this.delimiter == ch; + } + + /** + * Returns the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @return the String representation of an empty value + */ + public String getEmptyValue() { + return emptyValue; + } + + /** + * Sets the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @param emptyValue + * the String representation of an empty value + */ + public void setEmptyValue(String emptyValue) { + this.emptyValue = emptyValue; + } + + /** + * Indicates whether the CSV parser should accept unescaped quotes inside + * quoted values and parse them normally. Defaults to {@code true}. + * + * @return a flag indicating whether or not the CSV parser should accept + * unescaped quotes inside quoted values. + */ + public boolean isParseUnescapedQuotes() { + return parseUnescapedQuotes; + } + + /** + * Configures how to handle unescaped quotes inside quoted values. If set to + * {@code true}, the parser will parse the quote normally as part of the + * value. If set the {@code false}, a + * {@link com.univocity.parsers.common.TextParsingException} will be thrown. + * Defaults to {@code true}. + * + * @param parseUnescapedQuotes + * indicates whether or not the CSV parser should accept unescaped + * quotes inside quoted values. + */ + public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) { + this.parseUnescapedQuotes = parseUnescapedQuotes; + } + + /** + * Indicates whether or not the first valid record parsed from the input + * should be considered as the row containing the names of each column + * + * @return true if the first valid record parsed from the input should be + * considered as the row containing the names of each column, false + * otherwise + */ + public boolean isHeaderExtractionEnabled() { + return headerExtractionEnabled; + } + + /** + * Defines whether or not the first valid record parsed from the input should + * be considered as the row containing the names of each column + * + * @param headerExtractionEnabled + * a flag indicating whether the first valid record parsed from the + * input should be considered as the row containing the names of each + * column + */ + public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) { + this.headerExtractionEnabled = headerExtractionEnabled; + } + + public long getMaxCharsPerColumn() { + return maxCharsPerColumn; + } + + public void setMaxCharsPerColumn(long maxCharsPerColumn) { + this.maxCharsPerColumn = maxCharsPerColumn; + } + + public void setComment(byte comment) { + this.comment = comment; + } + + public byte getNormalizedNewLine() { + return normalizedNewLine; + } + + public void setNormalizedNewLine(byte normalizedNewLine) { + this.normalizedNewLine = normalizedNewLine; + } + + public boolean isIgnoreLeadingWhitespaces() { + return ignoreLeadingWhitespaces; + } + + public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) { + this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces; + } + + public boolean isIgnoreTrailingWhitespaces() { + return ignoreTrailingWhitespaces; + } + + public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) { + this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java new file mode 100644 index 000000000..17a076c0c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.UserException; + +import com.univocity.parsers.common.TextParsingException; + +/******************************************************************************* + * Portions Copyright 2014 uniVocity Software Pty Ltd + ******************************************************************************/ + +/** + * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and + * DrillBuf support. + */ +public final class TextReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class); + + private static final byte NULL_BYTE = (byte) '\0'; + + public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + + private final TextParsingContext context; + + private final TextParsingSettingsV3 settings; + + private final TextInput input; + private final TextOutput output; + private final DrillBuf workBuf; + + private byte ch; + + // index of the field within this record + private int fieldIndex; + + /** Behavior settings **/ + private final boolean ignoreTrailingWhitespace; + private final boolean ignoreLeadingWhitespace; + private final boolean parseUnescapedQuotes; + + /** Key Characters **/ + private final byte comment; + private final byte delimiter; + private final byte quote; + private final byte quoteEscape; + private final byte newLine; + + /** + * The CsvParser supports all settings provided by {@link TextParsingSettingsV3}, + * and requires this configuration to be properly initialized. + * @param settings the parser configuration + * @param input input stream + * @param output interface to produce output record batch + * @param workBuf working buffer to handle whitespace + */ + public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) { + this.context = new TextParsingContext(input, output); + this.workBuf = workBuf; + this.settings = settings; + + this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces(); + this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces(); + this.parseUnescapedQuotes = settings.isParseUnescapedQuotes(); + this.delimiter = settings.getDelimiter(); + this.quote = settings.getQuote(); + this.quoteEscape = settings.getQuoteEscape(); + this.newLine = settings.getNormalizedNewLine(); + this.comment = settings.getComment(); + + this.input = input; + this.output = output; + } + + public TextOutput getOutput() { return output; } + + /** + * Check if the given byte is a white space. As per the univocity text reader + * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed + * we have an additional check to make sure its not negative + */ + static final boolean isWhite(byte b){ + return b <= ' ' && b > -1; + } + + /** + * Inform the output interface to indicate we are starting a new record batch + */ + public void resetForNextBatch() { } + + public long getPos() { return input.getPos(); } + + /** + * Function encapsulates parsing an entire record, delegates parsing of the + * fields to parseField() function. + * We mark the start of the record and if there are any failures encountered (OOM for eg) + * then we reset the input stream to the marked position + * @return true if parsing this record was successful; false otherwise + * @throws IOException for input file read errors + */ + private boolean parseRecord() throws IOException { + final byte newLine = this.newLine; + final TextInput input = this.input; + + input.mark(); + + fieldIndex = 0; + if (ignoreLeadingWhitespace && isWhite(ch)) { + skipWhitespace(); + } + + output.startRecord(); + int fieldsWritten = 0; + try { + @SuppressWarnings("unused") + boolean earlyTerm = false; + while (ch != newLine) { + earlyTerm = ! parseField(); + fieldsWritten++; + if (ch != newLine) { + ch = input.nextChar(); + if (ch == newLine) { + output.startField(fieldsWritten++); + output.endEmptyField(); + break; + } + } + + // Disabling early termination. See DRILL-5914 + +// if (earlyTerm) { +// if (ch != newLine) { +// input.skipLines(1); +// } +// break; +// } + } + output.finishRecord(); + } catch (StreamFinishedPseudoException e) { + + // if we've written part of a field or all of a field, we should send this row. + + if (fieldsWritten == 0) { + throw e; + } else { + output.finishRecord(); + } + } + return true; + } + + /** + * Function parses an individual field and ignores any white spaces encountered + * by not appending it to the output vector + * @throws IOException for input file read errors + */ + private void parseValueIgnore() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + appendIgnoringWhitespace(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + public void appendIgnoringWhitespace(byte data) { + if (! isWhite(data)) { + output.append(data); + } + } + + /** + * Function parses an individual field and appends all characters till the delimeter (or newline) + * to the output, including white spaces + * @throws IOException for input file read errors + */ + private void parseValueAll() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + output.append(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + /** + * Function simply delegates the parsing of a single field to the actual + * implementation based on parsing config + * + * @throws IOException + * for input file read errors + */ + private void parseValue() throws IOException { + if (ignoreTrailingWhitespace) { + parseValueIgnore(); + } else { + parseValueAll(); + } + } + + /** + * Recursive function invoked when a quote is encountered. Function also + * handles the case when there are non-white space characters in the field + * after the quoted value. + * @param prev previous byte read + * @throws IOException for input file read errors + */ + private void parseQuotedValue(byte prev) throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + final byte quote = this.quote; + + ch = input.nextCharNoNewLineCheck(); + + while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) { + if (ch != quote) { + if (prev == quote) { // unescaped quote detected + if (parseUnescapedQuotes) { + output.append(quote); + output.append(ch); + parseQuotedValue(ch); + break; + } else { + throw new TextParsingException( + context, + "Unescaped quote character '" + + quote + + "' inside quoted value of CSV field. To allow unescaped quotes, " + + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. " + + "Cannot parse CSV input."); + } + } + output.append(ch); + prev = ch; + } else if (prev == quoteEscape) { + output.append(quote); + prev = NULL_BYTE; + } else { + prev = ch; + } + ch = input.nextCharNoNewLineCheck(); + } + + // Handles whitespace after quoted value: + // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ') + // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored + // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled. + if (ch != newLine && ch <= ' ' && ch != delimiter) { + final DrillBuf workBuf = this.workBuf; + workBuf.resetWriterIndex(); + do { + // saves whitespace after value + workBuf.writeByte(ch); + ch = input.nextChar(); + // found a new line, go to next record. + if (ch == newLine) { + return; + } + } while (ch <= ' ' && ch != delimiter); + + // there's more stuff after the quoted value, not only empty spaces. + if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) { + + output.append(quote); + for(int i =0; i < workBuf.writerIndex(); i++){ + output.append(workBuf.getByte(i)); + } + // the next character is not the escape character, put it there + if (ch != quoteEscape) { + output.append(ch); + } + // sets this character as the previous character (may be escaping) + // calls recursively to keep parsing potentially quoted content + parseQuotedValue(ch); + } + } + + if (!(ch == delimiter || ch == newLine)) { + throw new TextParsingException(context, "Unexpected character '" + ch + + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input."); + } + } + + /** + * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function + * @return true if more rows can be read, false if not + * @throws IOException for input file read errors + */ + private final boolean parseField() throws IOException { + + output.startField(fieldIndex++); + + if (isWhite(ch) && ignoreLeadingWhitespace) { + skipWhitespace(); + } + + // Delimiter? Then this is an empty field. + + if (ch == delimiter) { + return output.endEmptyField(); + } + + // Have the first character of the field. Parse and save the + // field, even if we hit EOF. (An EOF identifies a last line + // that contains data, but is not terminated with a newline.) + + try { + if (ch == quote) { + parseQuotedValue(NULL_BYTE); + } else { + parseValue(); + } + return output.endField(); + } catch (StreamFinishedPseudoException e) { + return output.endField(); + } + } + + /** + * Helper function to skip white spaces occurring at the current input stream. + * @throws IOException for input file read errors + */ + private void skipWhitespace() throws IOException { + final byte delimiter = this.delimiter; + final byte newLine = this.newLine; + final TextInput input = this.input; + + while (isWhite(ch) && ch != delimiter && ch != newLine) { + ch = input.nextChar(); + } + } + + /** + * Starting point for the reader. Sets up the input interface. + * @throws IOException for input file read errors + */ + public final void start() throws IOException { + context.stopped = false; + input.start(); + } + + /** + * Parses the next record from the input. Will skip the line if its a comment, + * this is required when the file contains headers + * @throws IOException for input file read errors + */ + public final boolean parseNext() throws IOException { + try { + while (! context.stopped) { + ch = input.nextChar(); + if (ch == comment) { + input.skipLines(1); + continue; + } + break; + } + final long initialLineNumber = input.lineCount(); + boolean success = parseRecord(); + if (initialLineNumber + 1 < input.lineCount()) { + throw new TextParsingException(context, "Cannot use newline character within quoted string"); + } + + return success; + } catch (UserException ex) { + stopParsing(); + throw ex; + } catch (StreamFinishedPseudoException ex) { + stopParsing(); + return false; + } catch (Exception ex) { + try { + throw handleException(ex); + } finally { + stopParsing(); + } + } + } + + private void stopParsing() { } + + private String displayLineSeparators(String str, boolean addNewLine) { + if (addNewLine) { + if (str.contains("\r\n")) { + str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t"); + } else if (str.contains("\n")) { + str = str.replaceAll("\\n", "[\\\\n]\n\t"); + } else { + str = str.replaceAll("\\r", "[\\\\r]\r\t"); + } + } else { + str = str.replaceAll("\\n", "\\\\n"); + str = str.replaceAll("\\r", "\\\\r"); + } + return str; + } + + /** + * Helper method to handle exceptions caught while processing text files and generate better error messages associated with + * the exception. + * @param ex Exception raised + * @throws IOException for input file read errors + */ + private TextParsingException handleException(Exception ex) throws IOException { + + if (ex instanceof TextParsingException) { + throw (TextParsingException) ex; + } + + String message = null; + String tmp = input.getStringSinceMarkForError(); + char[] chars = tmp.toCharArray(); + if (chars != null) { + int length = chars.length; + if (length > settings.getMaxCharsPerColumn()) { + message = "Length of parsed input (" + length + + ") exceeds the maximum number of characters defined in your parser settings (" + + settings.getMaxCharsPerColumn() + "). "; + } + + if (tmp.contains("\n") || tmp.contains("\r")) { + tmp = displayLineSeparators(tmp, true); + String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false); + message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '" + + lineSeparator + "'. Parsed content:\n\t" + tmp; + } + + int nullCharacterCount = 0; + // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError + int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length; + StringBuilder s = new StringBuilder(maxLength); + for (int i = 0; i < maxLength; i++) { + if (chars[i] == '\0') { + s.append('\\'); + s.append('0'); + nullCharacterCount++; + } else { + s.append(chars[i]); + } + } + tmp = s.toString(); + + if (nullCharacterCount > 0) { + message += "\nIdentified " + + nullCharacterCount + + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t" + + tmp; + } + } + + UserException.Builder builder; + if (ex instanceof UserException) { + builder = ((UserException) ex).rebuild(); + } else { + builder = UserException + .dataReadError(ex) + .message(message); + } + throw builder + .addContext("Line", context.currentLine()) + .addContext("Record", context.currentRecord()) + .build(logger); + } + + /** + * Finish the processing of a batch, indicates to the output + * interface to wrap up the batch + */ + public void finishBatch() { } + + /** + * Invoked once there are no more records and we are done with the + * current record reader to clean up state. + * @throws IOException for input file read errors + */ + public void close() throws IOException { + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java new file mode 100644 index 000000000..aced5adfd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Version 3 of the text reader. Hosts the "compliant" text reader on + * the row set framework. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; |