diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java new file mode 100644 index 000000000..17a076c0c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.UserException; + +import com.univocity.parsers.common.TextParsingException; + +/******************************************************************************* + * Portions Copyright 2014 uniVocity Software Pty Ltd + ******************************************************************************/ + +/** + * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and + * DrillBuf support. + */ +public final class TextReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class); + + private static final byte NULL_BYTE = (byte) '\0'; + + public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + + private final TextParsingContext context; + + private final TextParsingSettingsV3 settings; + + private final TextInput input; + private final TextOutput output; + private final DrillBuf workBuf; + + private byte ch; + + // index of the field within this record + private int fieldIndex; + + /** Behavior settings **/ + private final boolean ignoreTrailingWhitespace; + private final boolean ignoreLeadingWhitespace; + private final boolean parseUnescapedQuotes; + + /** Key Characters **/ + private final byte comment; + private final byte delimiter; + private final byte quote; + private final byte quoteEscape; + private final byte newLine; + + /** + * The CsvParser supports all settings provided by {@link TextParsingSettingsV3}, + * and requires this configuration to be properly initialized. + * @param settings the parser configuration + * @param input input stream + * @param output interface to produce output record batch + * @param workBuf working buffer to handle whitespace + */ + public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) { + this.context = new TextParsingContext(input, output); + this.workBuf = workBuf; + this.settings = settings; + + this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces(); + this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces(); + this.parseUnescapedQuotes = settings.isParseUnescapedQuotes(); + this.delimiter = settings.getDelimiter(); + this.quote = settings.getQuote(); + this.quoteEscape = settings.getQuoteEscape(); + this.newLine = settings.getNormalizedNewLine(); + this.comment = settings.getComment(); + + this.input = input; + this.output = output; + } + + public TextOutput getOutput() { return output; } + + /** + * Check if the given byte is a white space. As per the univocity text reader + * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed + * we have an additional check to make sure its not negative + */ + static final boolean isWhite(byte b){ + return b <= ' ' && b > -1; + } + + /** + * Inform the output interface to indicate we are starting a new record batch + */ + public void resetForNextBatch() { } + + public long getPos() { return input.getPos(); } + + /** + * Function encapsulates parsing an entire record, delegates parsing of the + * fields to parseField() function. + * We mark the start of the record and if there are any failures encountered (OOM for eg) + * then we reset the input stream to the marked position + * @return true if parsing this record was successful; false otherwise + * @throws IOException for input file read errors + */ + private boolean parseRecord() throws IOException { + final byte newLine = this.newLine; + final TextInput input = this.input; + + input.mark(); + + fieldIndex = 0; + if (ignoreLeadingWhitespace && isWhite(ch)) { + skipWhitespace(); + } + + output.startRecord(); + int fieldsWritten = 0; + try { + @SuppressWarnings("unused") + boolean earlyTerm = false; + while (ch != newLine) { + earlyTerm = ! parseField(); + fieldsWritten++; + if (ch != newLine) { + ch = input.nextChar(); + if (ch == newLine) { + output.startField(fieldsWritten++); + output.endEmptyField(); + break; + } + } + + // Disabling early termination. See DRILL-5914 + +// if (earlyTerm) { +// if (ch != newLine) { +// input.skipLines(1); +// } +// break; +// } + } + output.finishRecord(); + } catch (StreamFinishedPseudoException e) { + + // if we've written part of a field or all of a field, we should send this row. + + if (fieldsWritten == 0) { + throw e; + } else { + output.finishRecord(); + } + } + return true; + } + + /** + * Function parses an individual field and ignores any white spaces encountered + * by not appending it to the output vector + * @throws IOException for input file read errors + */ + private void parseValueIgnore() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + appendIgnoringWhitespace(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + public void appendIgnoringWhitespace(byte data) { + if (! isWhite(data)) { + output.append(data); + } + } + + /** + * Function parses an individual field and appends all characters till the delimeter (or newline) + * to the output, including white spaces + * @throws IOException for input file read errors + */ + private void parseValueAll() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + output.append(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + /** + * Function simply delegates the parsing of a single field to the actual + * implementation based on parsing config + * + * @throws IOException + * for input file read errors + */ + private void parseValue() throws IOException { + if (ignoreTrailingWhitespace) { + parseValueIgnore(); + } else { + parseValueAll(); + } + } + + /** + * Recursive function invoked when a quote is encountered. Function also + * handles the case when there are non-white space characters in the field + * after the quoted value. + * @param prev previous byte read + * @throws IOException for input file read errors + */ + private void parseQuotedValue(byte prev) throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + final byte quote = this.quote; + + ch = input.nextCharNoNewLineCheck(); + + while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) { + if (ch != quote) { + if (prev == quote) { // unescaped quote detected + if (parseUnescapedQuotes) { + output.append(quote); + output.append(ch); + parseQuotedValue(ch); + break; + } else { + throw new TextParsingException( + context, + "Unescaped quote character '" + + quote + + "' inside quoted value of CSV field. To allow unescaped quotes, " + + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. " + + "Cannot parse CSV input."); + } + } + output.append(ch); + prev = ch; + } else if (prev == quoteEscape) { + output.append(quote); + prev = NULL_BYTE; + } else { + prev = ch; + } + ch = input.nextCharNoNewLineCheck(); + } + + // Handles whitespace after quoted value: + // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ') + // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored + // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled. + if (ch != newLine && ch <= ' ' && ch != delimiter) { + final DrillBuf workBuf = this.workBuf; + workBuf.resetWriterIndex(); + do { + // saves whitespace after value + workBuf.writeByte(ch); + ch = input.nextChar(); + // found a new line, go to next record. + if (ch == newLine) { + return; + } + } while (ch <= ' ' && ch != delimiter); + + // there's more stuff after the quoted value, not only empty spaces. + if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) { + + output.append(quote); + for(int i =0; i < workBuf.writerIndex(); i++){ + output.append(workBuf.getByte(i)); + } + // the next character is not the escape character, put it there + if (ch != quoteEscape) { + output.append(ch); + } + // sets this character as the previous character (may be escaping) + // calls recursively to keep parsing potentially quoted content + parseQuotedValue(ch); + } + } + + if (!(ch == delimiter || ch == newLine)) { + throw new TextParsingException(context, "Unexpected character '" + ch + + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input."); + } + } + + /** + * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function + * @return true if more rows can be read, false if not + * @throws IOException for input file read errors + */ + private final boolean parseField() throws IOException { + + output.startField(fieldIndex++); + + if (isWhite(ch) && ignoreLeadingWhitespace) { + skipWhitespace(); + } + + // Delimiter? Then this is an empty field. + + if (ch == delimiter) { + return output.endEmptyField(); + } + + // Have the first character of the field. Parse and save the + // field, even if we hit EOF. (An EOF identifies a last line + // that contains data, but is not terminated with a newline.) + + try { + if (ch == quote) { + parseQuotedValue(NULL_BYTE); + } else { + parseValue(); + } + return output.endField(); + } catch (StreamFinishedPseudoException e) { + return output.endField(); + } + } + + /** + * Helper function to skip white spaces occurring at the current input stream. + * @throws IOException for input file read errors + */ + private void skipWhitespace() throws IOException { + final byte delimiter = this.delimiter; + final byte newLine = this.newLine; + final TextInput input = this.input; + + while (isWhite(ch) && ch != delimiter && ch != newLine) { + ch = input.nextChar(); + } + } + + /** + * Starting point for the reader. Sets up the input interface. + * @throws IOException for input file read errors + */ + public final void start() throws IOException { + context.stopped = false; + input.start(); + } + + /** + * Parses the next record from the input. Will skip the line if its a comment, + * this is required when the file contains headers + * @throws IOException for input file read errors + */ + public final boolean parseNext() throws IOException { + try { + while (! context.stopped) { + ch = input.nextChar(); + if (ch == comment) { + input.skipLines(1); + continue; + } + break; + } + final long initialLineNumber = input.lineCount(); + boolean success = parseRecord(); + if (initialLineNumber + 1 < input.lineCount()) { + throw new TextParsingException(context, "Cannot use newline character within quoted string"); + } + + return success; + } catch (UserException ex) { + stopParsing(); + throw ex; + } catch (StreamFinishedPseudoException ex) { + stopParsing(); + return false; + } catch (Exception ex) { + try { + throw handleException(ex); + } finally { + stopParsing(); + } + } + } + + private void stopParsing() { } + + private String displayLineSeparators(String str, boolean addNewLine) { + if (addNewLine) { + if (str.contains("\r\n")) { + str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t"); + } else if (str.contains("\n")) { + str = str.replaceAll("\\n", "[\\\\n]\n\t"); + } else { + str = str.replaceAll("\\r", "[\\\\r]\r\t"); + } + } else { + str = str.replaceAll("\\n", "\\\\n"); + str = str.replaceAll("\\r", "\\\\r"); + } + return str; + } + + /** + * Helper method to handle exceptions caught while processing text files and generate better error messages associated with + * the exception. + * @param ex Exception raised + * @throws IOException for input file read errors + */ + private TextParsingException handleException(Exception ex) throws IOException { + + if (ex instanceof TextParsingException) { + throw (TextParsingException) ex; + } + + String message = null; + String tmp = input.getStringSinceMarkForError(); + char[] chars = tmp.toCharArray(); + if (chars != null) { + int length = chars.length; + if (length > settings.getMaxCharsPerColumn()) { + message = "Length of parsed input (" + length + + ") exceeds the maximum number of characters defined in your parser settings (" + + settings.getMaxCharsPerColumn() + "). "; + } + + if (tmp.contains("\n") || tmp.contains("\r")) { + tmp = displayLineSeparators(tmp, true); + String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false); + message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '" + + lineSeparator + "'. Parsed content:\n\t" + tmp; + } + + int nullCharacterCount = 0; + // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError + int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length; + StringBuilder s = new StringBuilder(maxLength); + for (int i = 0; i < maxLength; i++) { + if (chars[i] == '\0') { + s.append('\\'); + s.append('0'); + nullCharacterCount++; + } else { + s.append(chars[i]); + } + } + tmp = s.toString(); + + if (nullCharacterCount > 0) { + message += "\nIdentified " + + nullCharacterCount + + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t" + + tmp; + } + } + + UserException.Builder builder; + if (ex instanceof UserException) { + builder = ((UserException) ex).rebuild(); + } else { + builder = UserException + .dataReadError(ex) + .message(message); + } + throw builder + .addContext("Line", context.currentLine()) + .addContext("Record", context.currentRecord()) + .build(logger); + } + + /** + * Finish the processing of a batch, indicates to the output + * interface to wrap up the batch + */ + public void finishBatch() { } + + /** + * Invoked once there are no more records and we are done with the + * current record reader to clean up state. + * @throws IOException for input file read errors + */ + public void close() throws IOException { + input.close(); + } +} |