aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java84
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java442
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java301
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java269
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java267
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java137
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java368
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java126
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java305
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java508
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf2
38 files changed, 3155 insertions, 361 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 93adda17c..25ac2c9f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -713,6 +713,14 @@ public final class ExecConstants {
public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY,
new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files."));
+ public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader";
+ public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY,
+ new OptionDescription("Enables the row set based version of the text/csv reader."));
+
+ public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width";
+ public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY,
+ new OptionDescription("Min width for text readers, mostly for testing."));
+
public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index 84aebdcd0..ebd7288a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -31,7 +31,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
public abstract class AbstractExchange extends AbstractSingle implements Exchange {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
// Ephemeral info for generating execution fragments.
protected int senderMajorFragmentId;
@@ -77,7 +76,7 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
}
/**
- * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances
+ * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrences
* in given endpoint list.
*
* @param fragmentEndpoints Drillbit endpoint assignments of fragments.
@@ -111,7 +110,6 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
setupSenders(senderLocations);
}
-
@Override
public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
this.receiverMajorFragmentId = majorFragmentId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
index 3bb1e5403..a5229f9d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.drill.exec.store.dfs.FileSelection;
public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
public AbstractFileGroupScan(String userName) {
super(userName);
@@ -46,5 +45,4 @@ public abstract class AbstractFileGroupScan extends AbstractGroupScan implements
public boolean supportsPartitionFilterPushdown() {
return true;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index d744db424..db4c73d9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -25,8 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-public abstract class AbstractSubScan extends AbstractBase implements SubScan{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
+public abstract class AbstractSubScan extends AbstractBase implements SubScan {
public AbstractSubScan(String userName) {
super(userName);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 063e26bef..1abc3d804 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -499,7 +499,6 @@ public class ScanBatch implements CloseableRecordBatch {
schemaChanged = false;
}
- @SuppressWarnings("resource")
private <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
Map<String, ValueVector> fieldVectorMap;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index c9d97bf72..436aea06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -42,7 +42,6 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -77,21 +76,21 @@ import java.util.List;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+
+ private static final String EMPTY_STRING = "";
+
private Projector projector;
private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters;
private List<FieldReference> complexFieldReferencesList;
private boolean hasRemainder = false;
- private int remainderIndex = 0;
+ private int remainderIndex;
private int recordCount;
-
private ProjectMemoryManager memoryManager;
-
-
- private static final String EMPTY_STRING = "";
private boolean first = true;
private boolean wasNone = false; // whether a NONE iter outcome was already seen
+ private ColumnExplorer columnExplorer;
private class ClassifierResult {
public boolean isStar = false;
@@ -114,6 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
+ columnExplorer = new ColumnExplorer(context.getOptions());
}
@Override
@@ -121,14 +121,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return recordCount;
}
-
@Override
protected void killIncoming(final boolean sendUpstream) {
super.killIncoming(sendUpstream);
hasRemainder = false;
}
-
@Override
public IterOutcome innerNext() {
if (wasNone) {
@@ -145,7 +143,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
@Override
public VectorContainer getOutgoingContainer() {
- return this.container;
+ return container;
}
@Override
@@ -204,14 +202,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
incomingRecordCount = incoming.getRecordCount();
memoryManager.update();
- logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}",
+ logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
}
}
}
if (complexWriters != null && getLastKnownOutcome() == EMIT) {
- throw new UnsupportedOperationException("Currently functions producing complex types as output is not " +
+ throw new UnsupportedOperationException("Currently functions producing complex types as output are not " +
"supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
"function in the projection list of outermost query.");
}
@@ -219,7 +217,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
first = false;
container.zeroVectors();
-
int maxOuputRecordCount = memoryManager.getOutputRowCount();
logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
@@ -233,7 +230,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
long projectEndTime = System.currentTimeMillis();
logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime));
-
if (outputRecords < incomingRecordCount) {
setValueCount(outputRecords);
hasRemainder = true;
@@ -277,7 +273,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0);
long projectEndTime = System.currentTimeMillis();
- logger.trace("handleRemainder: projection: " + "records {}, time {} ms", projRecords,(projectEndTime - projectStartTime));
+ logger.trace("handleRemainder: projection: records {}, time {} ms", projRecords,(projectEndTime - projectStartTime));
if (projRecords < remainingRecordCount) {
setValueCount(projRecords);
@@ -463,7 +459,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
memoryManager.addNewField(vv, write);
- final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+ cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
}
}
continue;
@@ -546,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
- final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+ cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
memoryManager.addNewField(ouputVector, write);
// We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
@@ -590,7 +586,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
private boolean isImplicitFileColumn(ValueVector vvIn) {
- return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null;
+ return columnExplorer.isImplicitFileColumn(vvIn.getField().getName());
}
private List<NamedExpression> getExpressionList() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index 966a03901..b9b3e782e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -24,7 +24,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan
import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput;
+import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader;
/**
* Parses the `columns` array. Doing so is surprisingly complex.
@@ -113,14 +113,14 @@ public class ColumnsArrayParser implements ScanProjectionParser {
if (inCol.isArray()) {
int maxIndex = inCol.maxIndex();
- if (maxIndex > RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) {
+ if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
throw UserException
.validationError()
.message(String.format(
"`columns`[%d] index out of bounds, max supported size is %d",
- maxIndex, RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS))
+ maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS))
.addContext("Column", inCol.name())
- .addContext("Maximum index", RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS)
.build(logger);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
index 8352dfa07..1166a5234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
@@ -69,6 +69,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
private List<FileSplit> spilts = new ArrayList<>();
private Iterator<FileSplit> splitIter;
private Path scanRootDir;
+ private int partitionDepth;
protected DrillFileSystem dfs;
private FileMetadataManager metadataManager;
@@ -82,12 +83,18 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
/**
* Specify the selection root for a directory scan, if any.
- * Used to populate partition columns.
+ * Used to populate partition columns. Also, specify the maximum
+ * partition depth.
+ *
* @param rootPath Hadoop file path for the directory
+ * @param partitionDepth maximum partition depth across all files
+ * within this logical scan operator (files in this scan may be
+ * shallower)
*/
- public void setSelectionRoot(Path rootPath) {
+ public void setSelectionRoot(Path rootPath, int partitionDepth) {
this.scanRootDir = rootPath;
+ this.partitionDepth = partitionDepth;
}
@Override
@@ -122,7 +129,10 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
metadataManager = new FileMetadataManager(
context.getFragmentContext().getOptions(),
+ true, // Expand partition columns with wildcard
+ false, // Put partition columns after table columns
scanRootDir,
+ partitionDepth,
paths);
scanOrchestrator.withMetadata(metadataManager);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index ae8502b52..a4ace55be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -46,6 +46,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
private boolean hasImplicitCols;
+ private boolean expandPartitionsAtEnd;
+
public FileMetadataColumnsParser(FileMetadataManager metadataManager) {
this.metadataManager = metadataManager;
partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
@@ -123,8 +125,10 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
}
private void buildWildcard() {
- if (metadataManager.useLegacyWildcardExpansion &&
- metadataManager.useLegacyExpansionLocation) {
+ if (!metadataManager.useLegacyWildcardExpansion) {
+ return;
+ }
+ if (metadataManager.useLegacyExpansionLocation) {
// Star column: this is a SELECT * query.
@@ -134,6 +138,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// set is constant across all files.
expandPartitions();
+ } else {
+ expandPartitionsAtEnd = true;
}
}
@@ -144,8 +150,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// feature to expand partitions for wildcards, and we want the
// partitions after data columns.
- if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion &&
- ! metadataManager.useLegacyExpansionLocation) {
+ if (expandPartitionsAtEnd) {
expandPartitions();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index ba49a9f54..201d30859 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -41,24 +41,51 @@ import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+/**
+ * Manages the insertion of file metadata (AKA "implicit" and partition) columns.
+ * Parses the file metadata columns from the projection list. Creates and loads
+ * the vectors that hold the data. If running in legacy mode, inserts partition
+ * columns when the query contains a wildcard. Supports renaming the columns via
+ * session options.
+ * <p>
+ * The lifecycle is that the manager is given the set of files for this scan
+ * operator so it can determine the partition depth. (Note that different scans
+ * may not agree on the depth. This is a known issue with Drill's implementation.)
+ * <p>
+ * Then, at the start of the scan, all projection columns are parsed. This class
+ * picks out the file metadata columns.
+ * <p>
+ * On each file (on each reader), the columns are "resolved." Here, that means
+ * that the columns are filled in with actual values based on the present file.
+ * <p>
+ * This is the successor to {@link ColumnExplorer}.
+ */
+
public class FileMetadataManager implements MetadataManager, SchemaProjectionResolver, VectorSource {
+ /**
+ * Automatically compute partition depth from files. Use only
+ * for testing!
+ */
+
+ public static final int AUTO_PARTITION_DEPTH = -1;
+
// Input
- private Path scanRootDir;
+ private final Path scanRootDir;
+ private final int partitionCount;
private FileMetadata currentFile;
// Config
protected final String partitionDesignator;
- protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
- protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
+ protected final List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
+ protected final Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
/**
* Indicates whether to expand partition columns when the query contains a wildcard.
* Supports queries such as the following:<code><pre>
- * select * from dfs.`partitioned-dir`
- * </pre><code>
+ * select * from dfs.`partitioned-dir`</pre></code>
* In which the output columns will be (columns, dir0) if the partitioned directory
* has one level of nesting.
*
@@ -86,7 +113,6 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
private final List<MetadataColumn> metadataColumns = new ArrayList<>();
private ConstantColumnLoader loader;
private VectorContainer outputContainer;
- private final int partitionCount;
/**
* Specifies whether to plan based on the legacy meaning of "*". See
@@ -111,10 +137,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
public FileMetadataManager(OptionSet optionManager,
boolean useLegacyWildcardExpansion,
boolean useLegacyExpansionLocation,
- Path rootDir, List<Path> files) {
+ Path rootDir,
+ int partitionCount,
+ List<Path> files) {
this.useLegacyWildcardExpansion = useLegacyWildcardExpansion;
this.useLegacyExpansionLocation = useLegacyExpansionLocation;
- scanRootDir = rootDir;
partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
for (ImplicitFileColumns e : ImplicitFileColumns.values()) {
@@ -129,28 +156,32 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
// The files and root dir are optional.
- if (scanRootDir == null || files == null) {
- partitionCount = 0;
+ if (rootDir == null || files == null) {
+ scanRootDir = null;
+ this.partitionCount = 0;
// Special case in which the file is the same as the
// root directory (occurs for a query with only one file.)
- } else if (files.size() == 1 && scanRootDir.equals(files.get(0))) {
+ } else if (files.size() == 1 && rootDir.equals(files.get(0))) {
scanRootDir = null;
- partitionCount = 0;
+ this.partitionCount = 0;
} else {
+ scanRootDir = rootDir;
- // Compute the partitions.
+ // Compute the partitions. Normally the count is passed in.
+ // But, handle the case where the count is unknown. Note: use this
+ // convenience only in testing since, in production, it can result
+ // in different scans reporting different numbers of partitions.
- partitionCount = computeMaxPartition(files);
+ if (partitionCount == -1) {
+ this.partitionCount = computeMaxPartition(files);
+ } else {
+ this.partitionCount = partitionCount;
+ }
}
}
- public FileMetadataManager(OptionSet optionManager,
- Path rootDir, List<Path> files) {
- this(optionManager, false, false, rootDir, files);
- }
-
private int computeMaxPartition(List<Path> files) {
int maxLen = 0;
for (Path filePath : files) {
@@ -165,6 +196,17 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
this.vectorCache = vectorCache;
}
+ /**
+ * Returns the file metadata column parser that:
+ * <ul>
+ * <li>Picks out the file metadata and partition columns,</li>
+ * <li>Inserts partition columns for a wildcard query, if the
+ * option to do so is set.</li>
+ * </ul>
+ *
+ * @see {@link #useLegacyWildcardExpansion}
+ */
+
@Override
public ScanProjectionParser projectionParser() { return parser; }
@@ -223,6 +265,10 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
currentFile = null;
}
+ /**
+ * Resolves metadata columns to concrete, materialized columns with the
+ * proper value for the present file.
+ */
@Override
public boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple,
TupleMetadata tableSchema) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index a5d6ca2a1..926936550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -282,7 +282,9 @@ public class ScanSchemaOrchestrator {
ScanProjectionParser parser = metadataManager.projectionParser();
if (parser != null) {
- parsers.add(parser);
+ // Insert in first position so that it is ensured to see
+ // any wildcard that exists
+ parsers.add(0, parser);
}
// Parse the projection list.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
index f109578e9..be45d569e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -87,17 +87,17 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
"width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
// 1.5 Cap the parallelization width by max allowed parallelization per node
- width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode()));
+ width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode()));
- // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more,
- // we end up allocating more work units to one or more endpoints that don't have those many work units.
+ // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if the width is more,
+ // we end up allocating more work units to one or more endpoints that don't have that many work units.
width = Math.min(totalMaxWidth, width);
// Step 2: Select the endpoints
final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
// 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled.
- for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
+ for (Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
endpoints.put(entry.getKey(), 1);
}
int totalAssigned = endpoints.size();
@@ -105,15 +105,15 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
// 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint
int remainingSlots = width - endpoints.size();
while (remainingSlots > 0) {
- for(EndpointAffinity epAf : endpointPool.values()) {
+ for (EndpointAffinity epAf : endpointPool.values()) {
final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots);
int currentAssignments = endpoints.get(epAf.getEndpoint());
- for(int i=0;
- i < moreAllocation &&
+ for (int i=0;
+ i < moreAllocation &&
totalAssigned < width &&
currentAssignments < parameters.getMaxWidthPerNode() &&
currentAssignments < epAf.getMaxWidth();
- i++) {
+ i++) {
totalAssigned++;
currentAssignments++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 8de57bca0..39b699fe4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -179,7 +179,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) {
planningSet.get(rootFragment);
- for(ExchangeFragmentPair fragmentPair : rootFragment) {
+ for (ExchangeFragmentPair fragmentPair : rootFragment) {
initFragmentWrappers(fragmentPair.getNode(), planningSet);
}
}
@@ -193,7 +193,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
- for(Wrapper currentFragmentWrapper : planningSet) {
+ for (Wrapper currentFragmentWrapper : planningSet) {
ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
if (sendingExchange != null) {
ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
@@ -209,17 +209,17 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
// parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
- // remove the fragment on which the current fragment depends on.
+ // remove the fragment on which the current fragment depends.
final Set<Wrapper> roots = Sets.newHashSet();
- for(Wrapper w : planningSet) {
+ for (Wrapper w : planningSet) {
roots.add(w);
}
- for(Wrapper wrapper : planningSet) {
+ for (Wrapper wrapper : planningSet) {
final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
- for(Wrapper dependency : fragmentDependencies) {
+ for (Wrapper dependency : fragmentDependencies) {
if (roots.contains(dependency)) {
roots.remove(dependency);
}
@@ -241,7 +241,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
return;
}
- // First parallelize fragments on which this fragment depends on.
+ // First parallelize fragments on which this fragment depends.
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for(Wrapper dependency : fragmentDependencies) {
@@ -288,20 +288,20 @@ public class SimpleParallelizer implements ParallelizationParameters {
Preconditions.checkArgument(op instanceof FragmentRoot);
FragmentRoot root = (FragmentRoot) op;
- FragmentHandle handle = FragmentHandle //
- .newBuilder() //
- .setMajorFragmentId(wrapper.getMajorFragmentId()) //
- .setMinorFragmentId(minorFragmentId) //
- .setQueryId(queryId) //
+ FragmentHandle handle = FragmentHandle
+ .newBuilder()
+ .setMajorFragmentId(wrapper.getMajorFragmentId())
+ .setMinorFragmentId(minorFragmentId)
+ .setQueryId(queryId)
.build();
- PlanFragment fragment = PlanFragment.newBuilder() //
- .setForeman(foremanNode) //
- .setHandle(handle) //
- .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
- .setLeafFragment(isLeafFragment) //
+ PlanFragment fragment = PlanFragment.newBuilder()
+ .setForeman(foremanNode)
+ .setHandle(handle)
+ .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId))
+ .setLeafFragment(isLeafFragment)
.setContext(queryContextInfo)
- .setMemInitial(wrapper.getInitialAllocation())//
+ .setMemInitial(wrapper.getInitialAllocation())
.setMemMax(wrapper.getMaxAllocation())
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 760a6ba5b..a07d3ed58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -111,7 +111,7 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA
final ScanStats stats = this.getGroupScan().getScanStats(settings);
final int columnCount = this.getRowType().getFieldCount();
- if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index b4fada61f..4d702a830 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -30,8 +30,8 @@ public class ScanPrule extends Prule{
public ScanPrule() {
super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule");
-
}
+
@Override
public void onMatch(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(0);
@@ -48,5 +48,4 @@ public class ScanPrule extends Prule{
call.transformTo(newScan);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index fa8e69d0b..267cecd4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -159,13 +159,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
class MajorFragmentStat {
private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
- private double maxRows = 0d;
+ private double maxRows;
private int maxWidth = Integer.MAX_VALUE;
- private boolean isMultiSubScan = false;
- private boolean rightSideOfLateral = false;
+ private boolean isMultiSubScan;
+ private boolean rightSideOfLateral;
//This flag if true signifies that all the Rels thus far
//are simple rels with no distribution requirement.
- private boolean isSimpleRel = false;
+ private boolean isSimpleRel;
public void add(Prel prel) {
maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -196,7 +196,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
return false;
}
- int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize);
+ int suggestedWidth = (int) Math.ceil((maxRows + 1) / targetSliceSize);
int w = Math.min(maxWidth, suggestedWidth);
if (w < 1) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9cd670e99..e201e2686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -242,6 +242,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR),
new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER),
+ new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER),
+ new OptionDefinition(ExecConstants.MIN_READER_WIDTH),
new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST),
new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE),
new OptionDefinition(ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 33b500018..1ae2d5e9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -55,14 +55,30 @@ public class ColumnExplorer {
*/
public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
- this.columns = columns;
- this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
this.selectedPartitionColumns = Lists.newArrayList();
this.tableColumns = Lists.newArrayList();
this.allImplicitColumns = initImplicitFileColumns(optionManager);
this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap();
+ if (columns == null) {
+ isStarQuery = false;
+ this.columns = null;
+ } else {
+ this.columns = columns;
+ this.isStarQuery = Utilities.isStarQuery(columns);
+ init();
+ }
+ }
- init();
+ /**
+ * Constructor for using the column explorer to probe existing columns in the
+ * {@link ProjectRecordBatch}.
+ */
+ // TODO: This is awkward. This class is being used for two distinct things:
+ // 1. The definition of the metadata columns, and
+ // 2. The projection of metadata columns in a particular query.
+ // Would be better to separate these two concepts.
+ public ColumnExplorer(OptionManager optionManager) {
+ this(optionManager, null);
}
/**
@@ -123,6 +139,15 @@ public class ColumnExplorer {
return matcher.matches();
}
+ public boolean isImplicitColumn(String name) {
+ return isPartitionColumn(partitionDesignator, name) ||
+ isImplicitFileColumn(name);
+ }
+
+ public boolean isImplicitFileColumn(String name) {
+ return allImplicitColumns.get(name) != null;
+ }
+
/**
* Returns list with partition column names.
* For the case when table has several levels of nesting, max level is chosen.
@@ -132,10 +157,24 @@ public class ColumnExplorer {
* @return list with partition column names.
*/
public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) {
- int partitionsCount = 0;
+ int partitionsCount = getPartitionDepth(selection);
+
+ String partitionColumnLabel = schemaConfig.getOption(
+ ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+ List<String> partitions = new ArrayList<>();
+
+ // generates partition column names: dir0, dir1 etc.
+ for (int i = 0; i < partitionsCount; i++) {
+ partitions.add(partitionColumnLabel + i);
+ }
+ return partitions;
+ }
+
+ public static int getPartitionDepth(FileSelection selection) {
// a depth of table root path
int rootDepth = selection.getSelectionRoot().depth();
+ int partitionsCount = 0;
for (Path file : selection.getFiles()) {
// Calculates partitions count for the concrete file:
// depth of file path - depth of table root path - 1.
@@ -145,15 +184,7 @@ public class ColumnExplorer {
// max depth of files path should be used to handle all partitions
partitionsCount = Math.max(partitionsCount, currentPartitionsCount);
}
-
- String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
- List<String> partitions = new ArrayList<>();
-
- // generates partition column names: dir0, dir1 etc.
- for (int i = 0; i < partitionsCount; i++) {
- partitions.add(partitionColumnLabel + i);
- }
- return partitions;
+ return partitionsCount;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 13017fadc..254cddbe3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -73,5 +73,4 @@ public interface FormatPlugin {
Configuration getFsConf();
DrillbitContext getContext();
String getName();
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d76c6489e..dc1f053a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.drill.shaded.guava.com.google.common.base.Functions;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -39,10 +40,16 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -55,67 +62,335 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+/**
+ * Base class for various file readers.
+ * <p>
+ * This version provides a bridge between the legacy {@link RecordReader}-style
+ * readers and the newer {@link FileBatchReader} style. Over time, split the
+ * class, or provide a cleaner way to handle the differences.
+ *
+ * @param <T> the format plugin config for this reader
+ */
+
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
- @SuppressWarnings("unused")
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ /**
+ * Defines the static, programmer-defined options for this plugin. These
+ * options are attributes of how the plugin works. The plugin config,
+ * defined in the class definition, provides user-defined options that can
+ * vary across uses of the plugin.
+ */
+
+ public static class EasyFormatConfig {
+ public BasicFormatMatcher matcher;
+ public boolean readable = true;
+ public boolean writable;
+ public boolean blockSplittable;
+ public boolean compressible;
+ public Configuration fsConf;
+ public List<String> extensions;
+ public String defaultName;
+
+ // Config options that, prior to Drill 1.15, required the plugin to
+ // override methods. Moving forward, plugins should be migrated to
+ // use this simpler form. New plugins should use these options
+ // instead of overriding methods.
+
+ public boolean supportsProjectPushdown;
+ public boolean supportsAutoPartitioning;
+ public int readerOperatorType = -1;
+ public int writerOperatorType = -1;
+ }
+
+ /**
+ * Creates the scan batch to use with the plugin. Drill supports the "classic"
+ * style of scan batch and readers, along with the newer size-aware,
+ * component-based version. The implementation of this class assembles the
+ * readers and scan batch operator as needed for each version.
+ */
+
+ public interface ScanBatchCreator {
+ CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan)
+ throws ExecutionSetupException;
+ }
+
+ /**
+ * Use the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+
+ public static class ClassicScanBatchCreator implements ScanBatchCreator {
+
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+ if (! columnExplorer.isStarQuery()) {
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+ columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth());
+ scan.setOperatorId(scan.getOperatorId());
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(scan);
+ final DrillFileSystem dfs;
+ try {
+ dfs = oContext.newFileSystem(plugin.easyConfig().fsConf);
+ } catch (final IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+ }
+
+ final List<RecordReader> readers = new LinkedList<>();
+ final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+ Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+ final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
+ for (final FileWork work : scan.getWorkUnits()) {
+ final RecordReader recordReader = getRecordReader(
+ plugin, context, dfs, work, scan.getColumns(), scan.getUserName());
+ readers.add(recordReader);
+ final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+ work.getPath(), scan.getSelectionRoot(), false);
+ final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(
+ work.getPath(), partitionValues, supportsFileImplicitColumns);
+ implicitColumns.add(implicitValues);
+ if (implicitValues.size() > mapWithMaxColumns.size()) {
+ mapWithMaxColumns = implicitValues;
+ }
+ }
+
+ // all readers should have the same number of implicit columns, add missing ones with value null
+ final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+ for (final Map<String, String> map : implicitColumns) {
+ map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+ }
+
+ return new ScanBatch(context, oContext, readers, implicitColumns);
+ }
+
+ /**
+ * Create a record reader given a file system, a file description and other
+ * information. For backward compatibility, calls the plugin method by
+ * default.
+ *
+ * @param plugin
+ * the plugin creating the scan
+ * @param context
+ * fragment context for the fragment running the scan
+ * @param dfs
+ * Drill's distributed file system facade
+ * @param fileWork
+ * description of the file to scan
+ * @param columns
+ * list of columns to project
+ * @param userName
+ * the name of the user performing the scan
+ * @return a scan operator
+ * @throws ExecutionSetupException
+ * if anything goes wrong
+ */
+
+ public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
+ }
+ }
+
+ /**
+ * Revised scanner based on the revised
+ * {@link ResultSetLoader} and {@link RowBatchReader} classes.
+ * Handles most projection tasks automatically. Able to limit
+ * vector and batch sizes. Use this for new format plugins.
+ */
+
+ public abstract static class ScanFrameworkCreator
+ implements ScanBatchCreator {
+
+ protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ /**
+ * Builds the revised {@link FileBatchReader}-based scan batch.
+ *
+ * @param context
+ * @param scan
+ * @return
+ * @throws ExecutionSetupException
+ */
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+
+ // Assemble the scan operator and its wrapper.
+
+ try {
+ final BaseFileScanFramework<?> framework = buildFramework(scan);
+ final Path selectionRoot = scan.getSelectionRoot();
+ if (selectionRoot != null) {
+ framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth());
+ }
+ return new OperatorRecordBatch(
+ context, scan,
+ new ScanOperatorExec(
+ framework));
+ } catch (final UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (final Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ /**
+ * Create the plugin-specific framework that manages the scan. The framework
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles "early" or "late" schema readers. A typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ *
+ * @param scan the physical operation definition for the scan operation. Contains
+ * one or more files to read. (The Easy format plugin works only for files.)
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+ protected abstract BaseFileScanFramework<?> buildFramework(
+ EasySubScan scan) throws ExecutionSetupException;
+ }
+
+ /**
+ * Generic framework creator for files that just use the basic file
+ * support: metadata, etc. Specialized use cases (special "columns"
+ * column, say) will require a specialized implementation.
+ */
+
+ public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
+
+ private final FileReaderFactory readerCreator;
+
+ public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FileReaderFactory readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ }
+
+ @Override
+ protected FileScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
- private final BasicFormatMatcher matcher;
+ final FileScanFramework framework = new FileScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+ return framework;
+ }
+ }
+
+ private final String name;
+ private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
- private final boolean readable;
- private final boolean writable;
- private final boolean blockSplittable;
- private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final T formatConfig;
- private final String name;
- private final boolean compressible;
+ /**
+ * Legacy constructor.
+ */
protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
- boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
- this.readable = readable;
- this.writable = writable;
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
+ boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName) {
+ this.name = name == null ? defaultName : name;
+ easyConfig = new EasyFormatConfig();
+ easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
+ easyConfig.readable = readable;
+ easyConfig.writable = writable;
this.context = context;
- this.blockSplittable = blockSplittable;
- this.compressible = compressible;
- this.fsConf = fsConf;
+ easyConfig.blockSplittable = blockSplittable;
+ easyConfig.compressible = compressible;
+ easyConfig.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
- this.name = name == null ? defaultName : name;
}
- @Override
- public Configuration getFsConf() {
- return fsConf;
+ /**
+ * Revised constructor in which settings are gathered into a configuration object.
+ *
+ * @param name name of the plugin
+ * @param config configuration options for this plugin which determine
+ * developer-defined runtime behavior
+ * @param context the global server-wide drillbit context
+ * @param storageConfig the configuration for the storage plugin that owns this
+ * format plugin
+ * @param formatConfig the Jackson-serialized format configuration as created
+ * by the user in the Drill web console. Holds user-defined options.
+ */
+
+ protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context,
+ StoragePluginConfig storageConfig, T formatConfig) {
+ this.name = name;
+ this.easyConfig = config;
+ this.context = context;
+ this.storageConfig = storageConfig;
+ this.formatConfig = formatConfig;
+ if (easyConfig.matcher == null) {
+ easyConfig.matcher = new BasicFormatMatcher(this,
+ easyConfig.fsConf, easyConfig.extensions,
+ easyConfig.compressible);
+ }
}
@Override
- public DrillbitContext getContext() {
- return context;
- }
+ public Configuration getFsConf() { return easyConfig.fsConf; }
@Override
- public String getName() {
- return name;
- }
+ public DrillbitContext getContext() { return context; }
- public abstract boolean supportsPushDown();
+ public EasyFormatConfig easyConfig() { return easyConfig; }
+
+ @Override
+ public String getName() { return name; }
/**
- * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
- * only split on file boundaries.
+ * Does this plugin support projection push down? That is, can the reader
+ * itself handle the tasks of projecting table columns, creating null
+ * columns for missing table columns, and so on?
*
- * @return True if splittable.
+ * @return <tt>true</tt> if the plugin supports projection push-down,
+ * <tt>false</tt> if Drill should do the task by adding a project operator
*/
- public boolean isBlockSplittable() {
- return blockSplittable;
- }
+
+ public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; }
+
+ /**
+ * Whether or not you can split the format based on blocks within file
+ * boundaries. If not, the simple format engine will only split on file
+ * boundaries.
+ *
+ * @return <code>true</code> if splittable.
+ */
+ public boolean isBlockSplittable() { return easyConfig.blockSplittable; }
/**
* Indicates whether or not this format could also be in a compression
@@ -125,52 +400,40 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*
* @return <code>true</code> if it is compressible
*/
- public boolean isCompressible() {
- return compressible;
- }
-
- public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
- List<SchemaPath> columns, String userName) throws ExecutionSetupException;
+ public boolean isCompressible() { return easyConfig.compressible; }
- CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
- final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
-
- if (!columnExplorer.isStarQuery()) {
- scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
- columnExplorer.getTableColumns(), scan.getSelectionRoot());
- scan.setOperatorId(scan.getOperatorId());
- }
+ /**
+ * Return a record reader for the specific file format, when using the original
+ * {@link ScanBatch} scanner.
+ * @param context fragment context
+ * @param dfs Drill file system
+ * @param fileWork metadata about the file to be scanned
+ * @param columns list of projected columns (or may just contain the wildcard)
+ * @param userName the name of the user running the query
+ * @return a record reader for this format
+ * @throws ExecutionSetupException for many reasons
+ */
- OperatorContext oContext = context.newOperatorContext(scan);
- final DrillFileSystem dfs;
- try {
- dfs = oContext.newFileSystem(fsConf);
- } catch (IOException e) {
- throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
- }
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner.");
+ }
- List<RecordReader> readers = new LinkedList<>();
- List<Map<String, String>> implicitColumns = Lists.newArrayList();
- Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
- boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
- for (FileWork work : scan.getWorkUnits()){
- RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName());
- readers.add(recordReader);
- List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false);
- Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns);
- implicitColumns.add(implicitValues);
- if (implicitValues.size() > mapWithMaxColumns.size()) {
- mapWithMaxColumns = implicitValues;
- }
- }
+ protected CloseableRecordBatch getReaderBatch(final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+ return scanBatchCreator(context.getOptions()).buildScan(context, scan);
+ }
- // all readers should have the same number of implicit columns, add missing ones with value null
- Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
- for (Map<String, String> map : implicitColumns) {
- map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
- }
+ /**
+ * Create the scan batch creator. Needed only when using the revised scan batch. In that
+ * case, override the <tt>readerIterator()</tt> method on the custom scan batch
+ * creator implementation.
+ *
+ * @return the strategy for creating the scan batch for this plugin
+ */
- return new ScanBatch(context, oContext, readers, implicitColumns);
+ protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+ return new ClassicScanBatchCreator(this);
}
public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
@@ -219,41 +482,28 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
@Override
- public T getConfig() {
- return formatConfig;
- }
+ public T getConfig() { return formatConfig; }
@Override
- public StoragePluginConfig getStorageConfig() {
- return storageConfig;
- }
+ public StoragePluginConfig getStorageConfig() { return storageConfig; }
@Override
- public boolean supportsRead() {
- return readable;
- }
+ public boolean supportsRead() { return easyConfig.readable; }
@Override
- public boolean supportsWrite() {
- return writable;
- }
+ public boolean supportsWrite() { return easyConfig.writable; }
@Override
- public boolean supportsAutoPartitioning() {
- return false;
- }
+ public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; }
@Override
- public FormatMatcher getMatcher() {
- return matcher;
- }
+ public FormatMatcher getMatcher() { return easyConfig.matcher; }
@Override
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of();
}
- public abstract int getReaderOperatorType();
- public abstract int getWriterOperatorType();
-
+ public int getReaderOperatorType() { return easyConfig.readerOperatorType; }
+ public int getWriterOperatorType() { return easyConfig.writerOperatorType; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 4449ec054..6a6243cd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -57,9 +58,11 @@ import org.apache.hadoop.fs.Path;
public class EasyGroupScan extends AbstractFileGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
- private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
+ private FileSelection selection;
+ private int partitionDepth;
private int maxWidth;
+ private int minWidth = 1;
private List<SchemaPath> columns;
private ListMultimap<Integer, CompleteFileWork> mappings;
@@ -104,9 +107,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
initFromSelection(selection, formatPlugin);
}
- @JsonIgnore
- public Iterable<CompleteFileWork> getWorkIterable() {
- return () -> Iterators.unmodifiableIterator(chunks.iterator());
+ public EasyGroupScan(
+ String userName,
+ FileSelection selection,
+ EasyFormatPlugin<?> formatPlugin,
+ List<SchemaPath> columns,
+ Path selectionRoot,
+ int minWidth
+ ) throws IOException{
+ this(userName, selection, formatPlugin, columns, selectionRoot);
+
+ // Set the minimum width of this reader. Primarily used for testing
+ // to force parallelism even for small test files.
+ // See ExecConstants.MIN_READER_WIDTH
+ this.minWidth = Math.max(1, Math.min(minWidth, maxWidth));
+
+ // Compute the maximum partition depth across all files.
+ partitionDepth = ColumnExplorer.getPartitionDepth(selection);
}
private EasyGroupScan(final EasyGroupScan that) {
@@ -118,17 +135,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
chunks = that.chunks;
endpointAffinities = that.endpointAffinities;
maxWidth = that.maxWidth;
+ minWidth = that.minWidth;
mappings = that.mappings;
+ partitionDepth = that.partitionDepth;
+ }
+
+ @JsonIgnore
+ public Iterable<CompleteFileWork> getWorkIterable() {
+ return () -> Iterators.unmodifiableIterator(chunks.iterator());
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
- @SuppressWarnings("resource")
final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.selection = selection;
BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
- this.maxWidth = chunks.size();
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
+ maxWidth = chunks.size();
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
public Path getSelectionRoot() {
@@ -136,11 +159,16 @@ public class EasyGroupScan extends AbstractFileGroupScan {
}
@Override
+ @JsonIgnore
+ public int getMinParallelizationWidth() {
+ return minWidth;
+ }
+
+ @Override
public int getMaxParallelizationWidth() {
return maxWidth;
}
-
@Override
public ScanStats getScanStats(final PlannerSettings settings) {
return formatPlugin.getScanStats(settings, this);
@@ -163,7 +191,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return columns;
}
-
@JsonIgnore
public FileSelection getFileSelection() {
return selection;
@@ -180,7 +207,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return new EasyGroupScan(this);
}
-
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
@@ -217,7 +243,8 @@ public class EasyGroupScan extends AbstractFileGroupScan {
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+ EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
+ columns, selectionRoot, partitionDepth);
subScan.setOperatorId(this.getOperatorId());
return subScan;
}
@@ -275,5 +302,4 @@ public class EasyGroupScan extends AbstractFileGroupScan {
public boolean canPushdownProjects(List<SchemaPath> columns) {
return formatPlugin.supportsPushDown();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index fbb3f475c..c51c7ac0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -42,7 +42,8 @@ public class EasySubScan extends AbstractSubScan{
private final List<FileWorkImpl> files;
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
- private Path selectionRoot;
+ private final Path selectionRoot;
+ private final int partitionDepth;
@JsonCreator
public EasySubScan(
@@ -52,7 +53,8 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("format") FormatPluginConfig formatConfig,
@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("selectionRoot") Path selectionRoot
+ @JsonProperty("selectionRoot") Path selectionRoot,
+ @JsonProperty("partitionDepth") int partitionDepth
) throws ExecutionSetupException {
super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -60,50 +62,40 @@ public class EasySubScan extends AbstractSubScan{
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
- public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
- Path selectionRoot){
+ public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
+ List<SchemaPath> columns, Path selectionRoot, int partitionDepth) {
super(userName);
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
@JsonProperty
- public Path getSelectionRoot() {
- return selectionRoot;
- }
+ public Path getSelectionRoot() { return selectionRoot; }
+
+ @JsonProperty
+ public int getPartitionDepth() { return partitionDepth; }
@JsonIgnore
- public EasyFormatPlugin<?> getFormatPlugin(){
- return formatPlugin;
- }
+ public EasyFormatPlugin<?> getFormatPlugin() { return formatPlugin; }
@JsonProperty("files")
- public List<FileWorkImpl> getWorkUnits() {
- return files;
- }
+ public List<FileWorkImpl> getWorkUnits() { return files; }
@JsonProperty("storage")
- public StoragePluginConfig getStorageConfig(){
- return formatPlugin.getStorageConfig();
- }
+ public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); }
@JsonProperty("format")
- public FormatPluginConfig getFormatConfig(){
- return formatPlugin.getConfig();
- }
+ public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); }
@JsonProperty("columns")
- public List<SchemaPath> getColumns(){
- return columns;
- }
+ public List<SchemaPath> getColumns() { return columns; }
@Override
- public int getOperatorType() {
- return formatPlugin.getReaderOperatorType();
- }
-
+ public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 03ae6f554..05ed1b564 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,34 +18,45 @@
package org.apache.drill.exec.store.easy.text;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.FileReaderCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
+import org.apache.drill.exec.store.easy.text.compliant.v3.CompliantTextBatchReader;
+import org.apache.drill.exec.store.easy.text.compliant.v3.TextParsingSettingsV3;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
@@ -61,91 +72,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
- private final static String DEFAULT_NAME = "text";
+public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig>
+ implements FileReaderCreator {
+ private final static String PLUGIN_NAME = "text";
- public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
- super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
- Collections.emptyList(), DEFAULT_NAME);
- }
-
- public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
- TextFormatConfig formatPluginConfig) {
- super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
- formatPluginConfig.getExtensions(), DEFAULT_NAME);
- }
-
-
- @Override
- public RecordReader getRecordReader(FragmentContext context,
- DrillFileSystem dfs,
- FileWork fileWork,
- List<SchemaPath> columns,
- String userName) {
- Path path = dfs.makeQualified(fileWork.getPath());
- FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
-
- if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
- TextParsingSettings settings = new TextParsingSettings();
- settings.set(formatConfig);
- return new CompliantTextRecordReader(split, dfs, settings, columns);
- } else {
- char delim = formatConfig.getFieldDelimiter();
- return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
- }
- }
-
- @Override
- public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
- throws IOException {
- return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
- }
-
- @Override
- public boolean supportsStatistics() {
- return false;
- }
-
- @Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
- long data = 0;
- for (final CompleteFileWork work : scan.getWorkIterable()) {
- data += work.getTotalBytes();
- }
- final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
- final double estRowCount = data / estimatedRowSize;
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
- }
-
- @Override
- public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
- final Map<String, String> options = new HashMap<>();
-
- options.put("location", writer.getLocation());
- FragmentHandle handle = context.getHandle();
- String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
- options.put("prefix", fragmentId);
- options.put("separator", getConfig().getFieldDelimiterAsString());
- options.put("extension", getConfig().getExtensions().get(0));
-
- RecordWriter recordWriter = new DrillTextRecordWriter(
- context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
- recordWriter.init(options);
-
- return recordWriter;
- }
-
- @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT)
+ @JsonTypeName(PLUGIN_NAME)
+ @JsonInclude(Include.NON_DEFAULT)
public static class TextFormatConfig implements FormatPluginConfig {
public List<String> extensions = ImmutableList.of();
@@ -157,34 +89,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
public boolean skipFirstLine = false;
public boolean extractHeader = false;
- public List<String> getExtensions() {
- return extensions;
- }
-
- public char getQuote() {
- return quote;
- }
-
- public char getEscape() {
- return escape;
- }
+ public TextFormatConfig() { }
- public char getComment() {
- return comment;
- }
-
- public String getLineDelimiter() {
- return lineDelimiter;
- }
-
- public char getFieldDelimiter() {
- return fieldDelimiter;
- }
+ public List<String> getExtensions() { return extensions; }
+ public char getQuote() { return quote; }
+ public char getEscape() { return escape; }
+ public char getComment() { return comment; }
+ public String getLineDelimiter() { return lineDelimiter; }
+ public char getFieldDelimiter() { return fieldDelimiter; }
+ public boolean isSkipFirstLine() { return skipFirstLine; }
@JsonIgnore
- public boolean isHeaderExtractionEnabled() {
- return extractHeader;
- }
+ public boolean isHeaderExtractionEnabled() { return extractHeader; }
@JsonIgnore
public String getFieldDelimiterAsString(){
@@ -197,10 +113,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
this.fieldDelimiter = delimiter;
}
- public boolean isSkipFirstLine() {
- return skipFirstLine;
- }
-
@Override
public int hashCode() {
final int prime = 31;
@@ -262,24 +174,173 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
}
return true;
}
+ }
+
+ public static class TextScanBatchCreator extends ScanFrameworkCreator {
+
+ private final FileReaderCreator readerCreator;
+ private final TextFormatPlugin textPlugin;
+
+ public TextScanBatchCreator(TextFormatPlugin plugin,
+ FileReaderCreator readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ textPlugin = plugin;
+ }
+
+ @Override
+ protected ColumnsScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
+ ColumnsScanFramework framework = new ColumnsScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+
+ // If this format has no headers, or wants to skip them,
+ // then we must use the columns column to hold the data.
+
+ framework.requireColumnsArray(
+ ! textPlugin.getConfig().isHeaderExtractionEnabled());
+
+ // Text files handle nulls in an unusual way. Missing columns
+ // are set to required Varchar and filled with blanks. Yes, this
+ // means that the SQL statement or code cannot differentiate missing
+ // columns from empty columns, but that is how CSV and other text
+ // files have been defined within Drill.
+
+ framework.setNullType(
+ MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.REQUIRED)
+ .build());
+
+ return framework;
+ }
+ }
+
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ this(name, context, fsConf, storageConfig, new TextFormatConfig());
+ }
+
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ TextFormatConfig formatPluginConfig) {
+ super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = true;
+ config.blockSplittable = true;
+ config.compressible = true;
+ config.supportsProjectPushdown = true;
+ config.extensions = pluginConfig.getExtensions();
+ config.fsConf = fsConf;
+ config.defaultName = PLUGIN_NAME;
+ config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+ config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
+ return config;
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+ throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns, OptionManager options) throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns,
+ selection.selectionRoot,
+ // Some paths provide a null option manager. In that case, default to a
+ // min width of 1; just like the base class.
+ options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY));
+ }
+
+ @Override
+ protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+ // Create the "legacy", "V2" reader or the new "V3" version based on
+ // the result set loader. This code should be temporary: the two
+ // readers provide identical functionality for the user; only the
+ // internals differ.
+ if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) {
+ return new TextScanBatchCreator(this, this);
+ } else {
+ return new ClassicScanBatchCreator(this);
+ }
+ }
+
+ // TODO: Remove this once the V2 reader is removed.
+ @Override
+ public RecordReader getRecordReader(FragmentContext context,
+ DrillFileSystem dfs,
+ FileWork fileWork,
+ List<SchemaPath> columns,
+ String userName) {
+ Path path = dfs.makeQualified(fileWork.getPath());
+ FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
+
+ if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
+ TextParsingSettings settings = new TextParsingSettings();
+ settings.set(formatConfig);
+ return new CompliantTextRecordReader(split, dfs, settings, columns);
+ } else {
+ char delim = formatConfig.getFieldDelimiter();
+ return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
+ }
+ }
+ @Override
+ public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
+ final Map<String, String> options = new HashMap<>();
+ options.put("location", writer.getLocation());
+ FragmentHandle handle = context.getHandle();
+ String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ options.put("prefix", fragmentId);
+ options.put("separator", getConfig().getFieldDelimiterAsString());
+ options.put("extension", getConfig().getExtensions().get(0));
+ RecordWriter recordWriter = new DrillTextRecordWriter(
+ context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
+ recordWriter.init(options);
+
+ return recordWriter;
}
@Override
- public int getReaderOperatorType() {
- return CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+ public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader(
+ DrillFileSystem dfs,
+ FileSplit split) throws ExecutionSetupException {
+ TextParsingSettingsV3 settings = new TextParsingSettingsV3();
+ settings.set(getConfig());
+ return new CompliantTextBatchReader(split, dfs, settings);
+ }
+ @Override
+ public boolean supportsStatistics() {
+ return false;
}
@Override
- public int getWriterOperatorType() {
- return CoreOperatorType.TEXT_WRITER_VALUE;
+ public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
}
@Override
- public boolean supportsPushDown() {
- return true;
+ public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
}
+ @Override
+ protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
+ long data = 0;
+ for (final CompleteFileWork work : scan.getWorkIterable()) {
+ data += work.getTotalBytes();
+ }
+ final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
+ final double estRowCount = data / estimatedRowSize;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
new file mode 100644
index 000000000..0ed3155a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Original version of the "compliant" text reader. This is version 2 of
+ * the text reader. This version is retained for temporary backward
+ * compatibility as we productize the newer version 3 based on the
+ * row set framework.
+ * <p>
+ * TODO: Remove the files in this package and move the files from the
+ * "v3" sub-package here once the version 3 implementation stabilizes.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
new file mode 100644
index 000000000..6bf0bb69c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+
+public abstract class BaseFieldOutput extends TextOutput {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+ private static final int MAX_FIELD_LENGTH = 1024 * 64;
+
+ // track which field is getting appended
+ protected int currentFieldIndex = -1;
+ // track chars within field
+ protected int currentDataPointer;
+ // track if field is still getting appended
+ private boolean fieldOpen = true;
+ // holds chars for a field
+ protected byte[] fieldBytes;
+ protected final RowSetLoader writer;
+ private final boolean[] projectionMask;
+ protected final int maxField;
+ protected boolean fieldProjected;
+
+ /**
+ * Initialize the field output for one of three scenarios:
+ * <ul>
+ * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1
+ * max fields.</li>
+ * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field
+ * of -1.</li>
+ * <li>SELECT a, b, c indicated by a non-null projection mask that
+ * identifies the indexes of the fields to be selected. In this case,
+ * this constructor computes the maximum field.</li>
+ * </ul>
+ *
+ * @param writer Row set writer that provides access to the writer for
+ * each column
+ * @param maxField the index of the last field to store. May be -1 if no
+ * fields are to be stored. Computed if the projection mask is set
+ * @param projectionMask a boolean array indicating which fields are
+ * to be projected to the output. Optional
+ */
+
+ public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) {
+ this.writer = writer;
+ this.projectionMask = projectionMask;
+
+ // If no projection mask is defined, then we want all columns
+ // up to the max field, which may be -1 if we want to select
+ // nothing.
+
+ if (projectionMask == null) {
+ this.maxField = maxField;
+ } else {
+
+ // Otherwise, use the projection mask to determine
+ // which fields are to be projected. (The file may well
+ // contain more than the projected set.)
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ this.maxField = end;
+ }
+
+ // If we project at least one field, allocate a buffer.
+
+ if (maxField >= 0) {
+ fieldBytes = new byte[MAX_FIELD_LENGTH];
+ }
+ }
+
+ /**
+ * Start a new record record. Resets all pointers
+ */
+
+ @Override
+ public void startRecord() {
+ currentFieldIndex = -1;
+ fieldOpen = false;
+ writer.start();
+ }
+
+ @Override
+ public void startField(int index) {
+ assert index == currentFieldIndex + 1;
+ currentFieldIndex = index;
+ currentDataPointer = 0;
+ fieldOpen = true;
+
+ // Figure out if this field is projected.
+
+ if (projectionMask == null) {
+ fieldProjected = currentFieldIndex <= maxField;
+ } else if (currentFieldIndex >= projectionMask.length) {
+ fieldProjected = false;
+ } else {
+ fieldProjected = projectionMask[currentFieldIndex];
+ }
+ }
+
+ @Override
+ public void append(byte data) {
+ if (! fieldProjected) {
+ return;
+ }
+ if (currentDataPointer >= MAX_FIELD_LENGTH - 1) {
+ throw UserException
+ .unsupportedError()
+ .message("Text column is too large.")
+ .addContext("Column", currentFieldIndex)
+ .addContext("Limit", MAX_FIELD_LENGTH)
+ .build(logger);
+ }
+
+ fieldBytes[currentDataPointer++] = data;
+ }
+
+ @Override
+ public boolean endField() {
+ fieldOpen = false;
+ return currentFieldIndex < maxField;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+ return endField();
+ }
+
+ @Override
+ public void finishRecord() {
+ if (fieldOpen) {
+ endField();
+ }
+ writer.save();
+ }
+
+ @Override
+ public long getRecordCount() {
+ return writer.rowCount();
+ }
+
+ @Override
+ public boolean isFull() {
+ return writer.isFull();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
new file mode 100644
index 000000000..e489003c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * New text reader, complies with the RFC 4180 standard for text/csv files
+ */
+public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class);
+
+ private static final int MAX_RECORDS_PER_BATCH = 8096;
+ private static final int READ_BUFFER = 1024 * 1024;
+ private static final int WHITE_SPACE_BUFFER = 64 * 1024;
+
+ // settings to be used while parsing
+ private final TextParsingSettingsV3 settings;
+ // Chunk of the file to be read by this reader
+ private final FileSplit split;
+ // text reader implementation
+ private TextReader reader;
+ // input buffer
+ private DrillBuf readBuffer;
+ // working buffer to handle whitespaces
+ private DrillBuf whitespaceBuffer;
+ private final DrillFileSystem dfs;
+
+ private RowSetLoader writer;
+
+ public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) {
+ this.split = split;
+ this.settings = settings;
+ this.dfs = dfs;
+
+ // Validate. Otherwise, these problems show up later as a data
+ // read error which is very confusing.
+
+ if (settings.getNewLineDelimiter().length == 0) {
+ throw UserException
+ .validationError()
+ .message("The text format line delimiter cannot be blank.")
+ .build(logger);
+ }
+ }
+
+ /**
+ * Performs the initial setup required for the record reader.
+ * Initializes the input stream, handling of the output record batch
+ * and the actual reader to be used.
+ * @param context operator context from which buffer's will be allocated and managed
+ * @param outputMutator Used to create the schema in the output record batch
+ */
+
+ @Override
+ public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
+ final OperatorContext context = schemaNegotiator.context();
+
+ // Note: DO NOT use managed buffers here. They remain in existence
+ // until the fragment is shut down. The buffers here are large.
+ // If we scan 1000 files, and allocate 1 MB for each, we end up
+ // holding onto 1 GB of memory in managed buffers.
+ // Instead, we allocate the buffers explicitly, and must free
+ // them.
+
+ readBuffer = context.getAllocator().buffer(READ_BUFFER);
+ whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
+
+ // TODO: Set this based on size of record rather than
+ // absolute count.
+
+ schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH);
+
+ // setup Output, Input, and Reader
+ try {
+ TextOutput output;
+
+ if (settings.isHeaderExtractionEnabled()) {
+ output = openWithHeaders(schemaNegotiator);
+ } else {
+ output = openWithoutHeaders(schemaNegotiator);
+ }
+ if (output == null) {
+ return false;
+ }
+ openReader(output);
+ return true;
+ } catch (final IOException e) {
+ throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
+ }
+ }
+
+ /**
+ * Extract header and use that to define the reader schema.
+ */
+
+ private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
+ final String [] fieldNames = extractHeader();
+ if (fieldNames == null) {
+ return null;
+ }
+ final TupleMetadata schema = new TupleSchema();
+ for (final String colName : fieldNames) {
+ schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED));
+ }
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new FieldVarCharOutput(writer);
+ }
+
+ /**
+ * When no headers, create a single array column "columns".
+ */
+
+ private TextOutput openWithoutHeaders(
+ ColumnsSchemaNegotiator schemaNegotiator) {
+ final TupleMetadata schema = new TupleSchema();
+ schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED));
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes());
+ }
+
+ private void openReader(TextOutput output) throws IOException {
+ logger.trace("Opening file {}", split.getPath());
+ final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput input = new TextInput(settings, stream, readBuffer,
+ split.getStart(), split.getStart() + split.getLength());
+
+ // setup Reader using Input and Output
+ reader = new TextReader(settings, input, output, whitespaceBuffer);
+ reader.start();
+ }
+
+ /**
+ * Extracts header from text file.
+ * Currently it is assumed to be first line if headerExtractionEnabled is set to true
+ * TODO: enhance to support more common header patterns
+ * @return field name strings
+ */
+
+ private String [] extractHeader() throws IOException {
+ assert settings.isHeaderExtractionEnabled();
+
+ // don't skip header in case skipFirstLine is set true
+ settings.setSkipFirstLine(false);
+
+ final HeaderBuilder hOutput = new HeaderBuilder(split.getPath());
+
+ // setup Input using InputStream
+ // we should read file header irrespective of split given given to this reader
+ final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength());
+
+ // setup Reader using Input and Output
+ this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer);
+ reader.start();
+
+ // extract first row only
+ reader.parseNext();
+
+ // grab the field names from output
+ final String [] fieldNames = hOutput.getHeaders();
+
+ // cleanup and set to skip the first line next time we read input
+ reader.close();
+ settings.setSkipFirstLine(true);
+
+ readBuffer.clear();
+ whitespaceBuffer.clear();
+ return fieldNames;
+ }
+
+ /**
+ * Generates the next record batch
+ * @return number of records in the batch
+ */
+
+ @Override
+ public boolean next() {
+ reader.resetForNextBatch();
+
+ try {
+ boolean more = false;
+ while (! writer.isFull()) {
+ more = reader.parseNext();
+ if (! more) {
+ break;
+ }
+ }
+ reader.finishBatch();
+
+ // Return false on the batch that hits EOF. The scan operator
+ // knows to process any rows in this final batch.
+
+ return more && writer.rowCount() > 0;
+ } catch (IOException | TextParsingException e) {
+ if (e.getCause() != null && e.getCause() instanceof UserException) {
+ throw (UserException) e.getCause();
+ }
+ throw UserException.dataReadError(e)
+ .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
+ split.getPath(), reader.getPos())
+ .build(logger);
+ }
+ }
+
+ /**
+ * Cleanup state once we are finished processing all the records.
+ * This would internally close the input stream we are reading from.
+ */
+ @Override
+ public void close() {
+
+ // Release the buffers allocated above. Double-check to handle
+ // unexpected multiple calls to close().
+
+ if (readBuffer != null) {
+ readBuffer.release();
+ readBuffer = null;
+ }
+ if (whitespaceBuffer != null) {
+ whitespaceBuffer.release();
+ whitespaceBuffer = null;
+ }
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (final IOException e) {
+ logger.warn("Exception while closing stream.", e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
new file mode 100644
index 000000000..df48a5548
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a set of varchar vectors. A varchar vector contains all the field
+ * values for a given column. Each record is a single value within each vector of the set.
+ */
+class FieldVarCharOutput extends BaseFieldOutput {
+
+ /**
+ * We initialize and add the varchar vector for each incoming field in this
+ * constructor.
+ * @param outputMutator Used to create/modify schema
+ * @param fieldNames Incoming field names
+ * @param columns List of columns selected in the query
+ * @param isStarQuery boolean to indicate if all fields are selected or not
+ */
+ public FieldVarCharOutput(RowSetLoader writer) {
+ super(writer,
+ TextReader.MAXIMUM_NUMBER_COLUMNS,
+ makeMask(writer));
+ }
+
+ private static boolean[] makeMask(RowSetLoader writer) {
+ final TupleMetadata schema = writer.tupleSchema();
+ final boolean projectionMask[] = new boolean[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ projectionMask[i] = schema.metadata(i).isProjected();
+ }
+ return projectionMask;
+ }
+
+ @Override
+ public boolean endField() {
+ if (fieldProjected) {
+ writer.scalar(currentFieldIndex)
+ .setBytes(fieldBytes, currentDataPointer);
+ }
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
new file mode 100644
index 000000000..62eafc898
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+/**
+ * Text output that implements a header reader/parser.
+ * The caller parses out the characters of each header;
+ * this class assembles UTF-8 bytes into Unicode characters,
+ * fixes invalid characters (those not legal for SQL symbols),
+ * and maps duplicate names to unique names.
+ * <p>
+ * That is, this class is as permissive as possible with file
+ * headers to avoid spurious query failures for trivial reasons.
+ */
+
+// Note: this class uses Java heap strings and the usual Java
+// convenience classes. Since we do heavy Unicode string operations,
+// and read a single row, there is no good reason to try to use
+// value vectors and direct memory for this task.
+
+public class HeaderBuilder extends TextOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class);
+
+ /**
+ * Maximum Drill symbol length, as enforced for headers.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ */
+ // TODO: Replace with the proper constant, if available
+ public static final int MAX_HEADER_LEN = 1024;
+
+ /**
+ * Prefix used to replace non-alphabetic characters at the start of
+ * a column name. For example, $foo becomes col_foo. Used
+ * because SQL does not allow _foo.
+ */
+
+ public static final String COLUMN_PREFIX = "col_";
+
+ /**
+ * Prefix used to create numbered columns for missing
+ * headers. Typical names: column_1, column_2, ...
+ */
+
+ public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
+
+ public final Path filePath;
+ public final List<String> headers = new ArrayList<>();
+ public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+
+ public HeaderBuilder(Path filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void startField(int index) {
+ currentField.clear();
+ }
+
+ @Override
+ public boolean endField() {
+ String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
+ header = validateSymbol(header);
+ headers.add(header);
+ return true;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+
+ // Empty header will be rewritten to "column_<n>".
+
+ return endField();
+ }
+
+ /**
+ * Validate the header name according to the SQL lexical rules.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ * @param header the header name to validate
+ */
+
+ // TODO: Replace with existing code, if any.
+ private String validateSymbol(String header) {
+ header = header.trim();
+
+ // To avoid unnecessary query failures, just make up a column name
+ // if the name is missing or all blanks.
+
+ if (header.isEmpty()) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ }
+ if (! Character.isAlphabetic(header.charAt(0))) {
+ return rewriteHeader(header);
+ }
+ for (int i = 1; i < header.length(); i++) {
+ char ch = header.charAt(i);
+ if (! Character.isAlphabetic(ch) &&
+ ! Character.isDigit(ch) && ch != '_') {
+ return rewriteHeader(header);
+ }
+ }
+ return header;
+ }
+
+ /**
+ * Given an invalid header, rewrite it to replace illegal characters
+ * with valid ones. The header won't be what the user specified,
+ * but it will be a valid SQL identifier. This solution avoids failing
+ * queries due to corrupted or invalid header data.
+ * <p>
+ * Names with invalid first characters are mapped to "col_". Example:
+ * $foo maps to col_foo. If the only character is non-alphabetic, treat
+ * the column as anonymous and create a generic name: column_4, etc.
+ * <p>
+ * This mapping could create a column that exceeds the maximum length
+ * of 1024. Since that is not really a hard limit, we just live with the
+ * extra few characters.
+ *
+ * @param header the original header
+ * @return the rewritten header, valid for SQL
+ */
+
+ private String rewriteHeader(String header) {
+ final StringBuilder buf = new StringBuilder();
+
+ // If starts with non-alphabetic, can't map the character to
+ // underscore, so just tack on a prefix.
+
+ char ch = header.charAt(0);
+ if (Character.isAlphabetic(ch)) {
+ buf.append(ch);
+ } else if (Character.isDigit(ch)) {
+ buf.append(COLUMN_PREFIX);
+ buf.append(ch);
+
+ // For the strange case of only one character, format
+ // the same as an empty header.
+
+ } else if (header.length() == 1) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ } else {
+ buf.append(COLUMN_PREFIX);
+ }
+
+ // Convert all remaining invalid characters to underscores
+
+ for (int i = 1; i < header.length(); i++) {
+ ch = header.charAt(i);
+ if (Character.isAlphabetic(ch) ||
+ Character.isDigit(ch) || ch == '_') {
+ buf.append(ch);
+ } else {
+ buf.append("_");
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void append(byte data) {
+
+ // Ensure the data fits. Note that, if the name is Unicode, the actual
+ // number of characters might be less than the limit even though the
+ // byte count exceeds the limit. Fixing this, in general, would require
+ // a buffer four times larger, so we leave that as a later improvement
+ // if ever needed.
+
+ try {
+ currentField.put(data);
+ } catch (BufferOverflowException e) {
+ throw UserException.dataReadError()
+ .message("Column exceeds maximum length of %d", MAX_HEADER_LEN)
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void finishRecord() {
+ if (headers.isEmpty()) {
+ throw UserException.dataReadError()
+ .message("The file must define at least one header.")
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+
+ // Force headers to be unique.
+
+ final Set<String> idents = new HashSet<String>();
+ for (int i = 0; i < headers.size(); i++) {
+ String header = headers.get(i);
+ String key = header.toLowerCase();
+
+ // Is the header a duplicate?
+
+ if (idents.contains(key)) {
+
+ // Make header unique by appending a suffix.
+ // This loop must end because we have a finite
+ // number of headers.
+ // The original column is assumed to be "1", so
+ // the first duplicate is "2", and so on.
+ // Note that this will map columns of the form:
+ // "col,col,col_2,col_2_2" to
+ // "col", "col_2", "col_2_2", "col_2_2_2".
+ // No mapping scheme is perfect...
+
+ for (int l = 2; ; l++) {
+ final String rewritten = header + "_" + l;
+ key = rewritten.toLowerCase();
+ if (! idents.contains(key)) {
+ headers.set(i, rewritten);
+ break;
+ }
+ }
+ }
+ idents.add(key);
+ }
+ }
+
+ @Override
+ public void startRecord() { }
+
+ public String[] getHeaders() {
+
+ // Just return the headers: any needed checks were done in
+ // finishRecord()
+
+ final String array[] = new String[headers.size()];
+ return headers.toArray(array);
+ }
+
+ // Not used.
+ @Override
+ public long getRecordCount() { return 0; }
+
+ @Override
+ public boolean isFull() { return false; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
new file mode 100644
index 000000000..13b44509e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a single vector of type repeated varchar vector. Each record is a single
+ * value within the vector containing all the fields in the record as individual array elements.
+ */
+public class RepeatedVarCharOutput extends BaseFieldOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+
+ private final ScalarWriter columnWriter;
+ private final ArrayWriter arrayWriter;
+
+ /**
+ * Provide the row set loader (which must have just one repeated Varchar
+ * column) and an optional array projection mask.
+ * @param projectionMask
+ * @param tupleLoader
+ */
+
+ public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) {
+ super(loader,
+ maxField(loader, projectionMask),
+ projectionMask);
+ arrayWriter = writer.array(0);
+ columnWriter = arrayWriter.scalar();
+ }
+
+ private static int maxField(RowSetLoader loader, boolean[] projectionMask) {
+
+ // If the one and only field (`columns`) is not selected, then this
+ // is a COUNT(*) or similar query. Select nothing.
+
+ if (! loader.tupleSchema().metadata(0).isProjected()) {
+ return -1;
+ }
+
+ // If this is SELECT * or SELECT `columns` query, project all
+ // possible fields.
+
+ if (projectionMask == null) {
+ return TextReader.MAXIMUM_NUMBER_COLUMNS;
+ }
+
+ // Else, this is a SELECT columns[x], columns[y], ... query.
+ // Project only the requested element members (fields).
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ return end;
+ }
+
+ /**
+ * Write the value into an array position. Rules:
+ * <ul>
+ * <li>If there is no projection mask, collect all columns.</li>
+ * <li>If a selection mask is present, we previously found the index
+ * of the last projection column (<tt>maxField</tt>). If the current
+ * column is beyond that number, ignore the data and stop accepting
+ * columns.</li>
+ * <li>If the column is projected, add the data to the array.</li>
+ * <li>If the column is not projected, add a blank value to the
+ * array.</li>
+ * </ul>
+ * The above ensures that we leave no holes in the portion of the
+ * array that is projected (by adding blank columns where needed),
+ * and we just ignore columns past the end of the projected part
+ * of the array. (No need to fill holes at the end.)
+ */
+
+ @Override
+ public boolean endField() {
+
+ // Skip the field if past the set of projected fields.
+
+ if (currentFieldIndex > maxField) {
+ return false;
+ }
+
+ // If the field is projected, save it.
+
+ if (fieldProjected) {
+
+ // Repeated var char will create as many entries as there are columns.
+ // If this would exceed the maximum, issue an error. Note that we do
+ // this only if all fields are selected; the same query will succeed if
+ // the user does a COUNT(*) or SELECT columns[x], columns[y], ...
+
+ if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+ throw UserException
+ .unsupportedError()
+ .message("Text file contains too many fields")
+ .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS)
+ .build(logger);
+ }
+
+ // Save the field.
+
+ columnWriter.setBytes(fieldBytes, currentDataPointer);
+ } else {
+
+ // The field is not projected.
+ // Must write a value into this array position, but
+ // the value should be empty.
+
+ columnWriter.setBytes(fieldBytes, 0);
+ }
+
+ // Return whether the rest of the fields should be read.
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
new file mode 100644
index 000000000..70c43b7ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+class StreamFinishedPseudoException extends RuntimeException {
+
+ public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+
+ private StreamFinishedPseudoException() {
+ super("", null, false, true);
+
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
new file mode 100644
index 000000000..26fade6d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+ private final byte[] lineSeparator;
+ private final byte normalizedLineSeparator;
+ private final TextParsingSettingsV3 settings;
+
+ private long lineCount;
+ private long charCount;
+
+ /**
+ * The starting position in the file.
+ */
+ private final long startPos;
+ private final long endPos;
+
+ private long streamPos;
+
+ private final Seekable seekable;
+ private final FSDataInputStream inputFS;
+ private final InputStream input;
+
+ private final DrillBuf buffer;
+ private final ByteBuffer underlyingBuffer;
+ private final long bStart;
+ private final long bStartMinus1;
+
+ private final boolean bufferReadable;
+
+ /**
+ * Whether there was a possible partial line separator on the previous
+ * read so we dropped it and it should be appended to next read.
+ */
+ private int remByte = -1;
+
+ /**
+ * The current position in the buffer.
+ */
+ public int bufferPtr;
+
+ /**
+ * The quantity of valid data in the buffer.
+ */
+ public int length = -1;
+
+ private boolean endFound = false;
+
+ /**
+ * Creates a new instance with the mandatory characters for handling newlines
+ * transparently. lineSeparator the sequence of characters that represent a
+ * newline, as defined in {@link Format#getLineSeparator()}
+ * normalizedLineSeparator the normalized newline character (as defined in
+ * {@link Format#getNormalizedNewline()}) that is used to replace any
+ * lineSeparator sequence found in the input.
+ */
+ public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+ this.lineSeparator = settings.getNewLineDelimiter();
+ byte normalizedLineSeparator = settings.getNormalizedNewLine();
+ Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+ boolean isCompressed = input instanceof CompressionInputStream;
+ Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+ // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
+ if (isCompressed && endPos > 0) {
+ endPos = Long.MAX_VALUE;
+ }
+
+ this.input = input;
+ this.seekable = (Seekable) input;
+ this.settings = settings;
+
+ if (input instanceof FSDataInputStream) {
+ this.inputFS = (FSDataInputStream) input;
+ this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+ } else {
+ this.inputFS = null;
+ this.bufferReadable = false;
+ }
+
+ this.startPos = startPos;
+ this.endPos = endPos;
+
+ this.normalizedLineSeparator = normalizedLineSeparator;
+
+ this.buffer = readBuffer;
+ this.bStart = buffer.memoryAddress();
+ this.bStartMinus1 = bStart -1;
+ this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+ }
+
+ /**
+ * Test the input to position for read start. If the input is a non-zero split or
+ * splitFirstLine is enabled, input will move to appropriate complete line.
+ * @throws IOException for input file read errors
+ */
+ final void start() throws IOException {
+ lineCount = 0;
+ if(startPos > 0){
+ seekable.seek(startPos);
+ }
+
+ updateBuffer();
+ if (length > 0) {
+ if (startPos > 0 || settings.isSkipFirstLine()) {
+
+ // move to next full record.
+ skipLines(1);
+ }
+ }
+ }
+
+
+ /**
+ * Helper method to get the most recent characters consumed since the last record started.
+ * May get an incomplete string since we don't support stream rewind. Returns empty string for now.
+ *
+ * @return String of last few bytes.
+ * @throws IOException for input file read errors
+ */
+ public String getStringSinceMarkForError() throws IOException {
+ return " ";
+ }
+
+ long getPos() {
+ return streamPos + bufferPtr;
+ }
+
+ public void mark() { }
+
+ /**
+ * Read some more bytes from the stream. Uses the zero copy interface if available.
+ * Otherwise, does byte copy.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void read() throws IOException {
+ if (bufferReadable) {
+
+ if (remByte != -1) {
+ for (int i = 0; i <= remByte; i++) {
+ underlyingBuffer.put(lineSeparator[i]);
+ }
+ remByte = -1;
+ }
+ length = inputFS.read(underlyingBuffer);
+
+ } else {
+ byte[] b = new byte[underlyingBuffer.capacity()];
+ if (remByte != -1){
+ int remBytesNum = remByte + 1;
+ System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+ length = input.read(b, remBytesNum, b.length - remBytesNum);
+ remByte = -1;
+ } else {
+ length = input.read(b);
+ }
+ underlyingBuffer.put(b);
+ }
+ }
+
+
+ /**
+ * Read more data into the buffer. Will also manage split end conditions.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void updateBuffer() throws IOException {
+ streamPos = seekable.getPos();
+ underlyingBuffer.clear();
+
+ if (endFound) {
+ length = -1;
+ return;
+ }
+
+ read();
+
+ // check our data read allowance.
+ if (streamPos + length >= this.endPos) {
+ updateLengthBasedOnConstraint();
+ }
+
+ charCount += bufferPtr;
+ bufferPtr = 1;
+
+ buffer.writerIndex(underlyingBuffer.limit());
+ buffer.readerIndex(underlyingBuffer.position());
+ }
+
+ /**
+ * Checks to see if we can go over the end of our bytes constraint on the data. If so,
+ * adjusts so that we can only read to the last character of the first line that crosses
+ * the split boundary.
+ */
+ private void updateLengthBasedOnConstraint() {
+ final long max = bStart + length;
+ for (long m = bStart + (endPos - streamPos); m < max; m++) {
+ for (int i = 0; i < lineSeparator.length; i++) {
+ long mPlus = m + i;
+ if (mPlus < max) {
+ // we found a line separator and don't need to consult the next byte.
+ if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
+ length = (int) (mPlus - bStart) + 1;
+ endFound = true;
+ return;
+ }
+ } else {
+ // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
+ remByte = i;
+ length = length - i;
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get next byte from stream. Also maintains the current line count. Will throw a
+ * {@link StreamFinishedPseudoException} when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextChar() throws IOException {
+ byte byteChar = nextCharNoNewLineCheck();
+ int bufferPtrTemp = bufferPtr - 1;
+ if (byteChar == lineSeparator[0]) {
+ for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+ if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
+ return byteChar;
+ }
+ }
+
+ lineCount++;
+ byteChar = normalizedLineSeparator;
+
+ // we don't need to update buffer position if line separator is one byte long
+ if (lineSeparator.length > 1) {
+ bufferPtr += (lineSeparator.length - 1);
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+ }
+ }
+
+ return byteChar;
+ }
+
+ /**
+ * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException
+ * when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextCharNoNewLineCheck() throws IOException {
+
+ if (length == -1) {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+
+ rangeCheck(buffer, bufferPtr - 1, bufferPtr);
+
+ byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ bufferPtr--;
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+
+ bufferPtr++;
+ return byteChar;
+ }
+
+ /**
+ * Number of lines read since the start of this split.
+ * @return current line count
+ */
+ public final long lineCount() {
+ return lineCount;
+ }
+
+ /**
+ * Skip forward the number of line delimiters. If you are in the middle of a line,
+ * a value of 1 will skip to the start of the next record.
+ *
+ * @param lines Number of lines to skip.
+ * @throws IOException for input file read errors
+ * @throws IllegalArgumentException if unable to skip the requested number
+ * of lines
+ */
+ public final void skipLines(int lines) throws IOException {
+ if (lines < 1) {
+ return;
+ }
+ long expectedLineCount = this.lineCount + lines;
+
+ try {
+ do {
+ nextChar();
+ } while (lineCount < expectedLineCount);
+ if (lineCount < lines) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ } catch (EOFException ex) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ }
+
+ public final long charCount() {
+ return charCount + bufferPtr;
+ }
+
+ public long getLineCount() {
+ return lineCount;
+ }
+
+ public void close() throws IOException{
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
new file mode 100644
index 000000000..48c184991
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+
+/**
+ * Base class for producing output record batches while dealing with
+ * text files. Defines the interface called from text parsers to create
+ * the corresponding value vectors (record batch).
+ */
+
+abstract class TextOutput {
+
+ public abstract void startRecord();
+
+ /**
+ * Start processing a new field within a record.
+ * @param index index within the record
+ */
+ public abstract void startField(int index);
+
+ /**
+ * End processing a field within a record.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endField();
+
+ /**
+ * Shortcut that lets the output know that we are closing ending a field with no data.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endEmptyField();
+
+ /**
+ * Add the provided data but drop any whitespace.
+ * @param data character to append
+ */
+ public void appendIgnoringWhitespace(byte data) {
+ if (TextReader.isWhite(data)) {
+ // noop
+ } else {
+ append(data);
+ }
+ }
+
+ /**
+ * Appends a byte to the output character data buffer
+ * @param data current byte read
+ */
+ public abstract void append(byte data);
+
+ /**
+ * Completes the processing of a given record. Also completes the processing of the
+ * last field being read.
+ */
+ public abstract void finishRecord();
+
+ /**
+ * Return the total number of records (across batches) processed
+ */
+ public abstract long getRecordCount();
+
+ /**
+ * Indicates if the current batch is full and reading for this batch
+ * should stop.
+ *
+ * @return true if the batch is full and the reader must exit to allow
+ * the batch to be sent downstream, false if the reader may continue to
+ * add rows to the current batch
+ */
+
+ public abstract boolean isFull();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
new file mode 100644
index 000000000..86cad4c88
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+
+import com.univocity.parsers.common.ParsingContext;
+
+class TextParsingContext implements ParsingContext {
+
+ private final TextInput input;
+ private final TextOutput output;
+ protected boolean stopped;
+
+ private int[] extractedIndexes;
+
+ public TextParsingContext(TextInput input, TextOutput output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentLine() {
+ return input.lineCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentChar() {
+ return input.charCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int currentColumn() {
+ return -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String[] headers() {
+ return new String[]{};
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int[] extractedFieldIndexes() {
+ return extractedIndexes;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentRecord() {
+ return output.getRecordCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String currentParsedContent() {
+ try {
+ return input.getStringSinceMarkForError();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void skipLines(int lines) {
+ }
+
+ @Override
+ public boolean columnsReordered() {
+ return false;
+ }
+
+ public boolean isFull() {
+ return output.isFull();
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
new file mode 100644
index 000000000..0341b4554
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+// TODO: Remove the "V3" suffix once the V2 version is retired.
+public class TextParsingSettingsV3 {
+
+ public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3();
+
+ private String emptyValue = null;
+ private boolean parseUnescapedQuotes = true;
+ private byte quote = b('"');
+ private byte quoteEscape = b('"');
+ private byte delimiter = b(',');
+ private byte comment = b('#');
+
+ private long maxCharsPerColumn = Character.MAX_VALUE;
+ private byte normalizedNewLine = b('\n');
+ private byte[] newLineDelimiter = {normalizedNewLine};
+ private boolean ignoreLeadingWhitespaces;
+ private boolean ignoreTrailingWhitespaces;
+ private String lineSeparatorString = "\n";
+ private boolean skipFirstLine;
+
+ private boolean headerExtractionEnabled;
+ private boolean useRepeatedVarChar = true;
+
+ public void set(TextFormatConfig config){
+ this.quote = bSafe(config.getQuote(), "quote");
+ this.quoteEscape = bSafe(config.getEscape(), "escape");
+ this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+ this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+ this.comment = bSafe(config.getComment(), "comment");
+ this.skipFirstLine = config.isSkipFirstLine();
+ this.headerExtractionEnabled = config.isHeaderExtractionEnabled();
+ if (this.headerExtractionEnabled) {
+ // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar
+ this.useRepeatedVarChar = false;
+ }
+ }
+
+ public byte getComment() {
+ return comment;
+ }
+
+ public boolean isSkipFirstLine() {
+ return skipFirstLine;
+ }
+
+ public void setSkipFirstLine(boolean skipFirstLine) {
+ this.skipFirstLine = skipFirstLine;
+ }
+
+ public boolean isUseRepeatedVarChar() {
+ return useRepeatedVarChar;
+ }
+
+ public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+ this.useRepeatedVarChar = useRepeatedVarChar;
+ }
+
+ private static byte bSafe(char c, String name) {
+ if (c > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a "
+ + "character between 0 and 127 but value was actually %d.", name, (int) c));
+ }
+ return (byte) c;
+ }
+
+ private static byte b(char c) {
+ return (byte) c;
+ }
+
+ public byte[] getNewLineDelimiter() {
+ return newLineDelimiter;
+ }
+
+ /**
+ * Returns the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @return the quote character
+ */
+ public byte getQuote() {
+ return quote;
+ }
+
+ /**
+ * Defines the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @param quote
+ * the quote character
+ */
+ public void setQuote(byte quote) {
+ this.quote = quote;
+ }
+
+ public String getLineSeparatorString() {
+ return lineSeparatorString;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping values
+ * where the field delimiter is part of the value
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the character used for escaping
+ * values, false otherwise
+ */
+ public boolean isQuote(byte ch) {
+ return this.quote == ch;
+ }
+
+ /**
+ * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+ * @return the quote escape character
+ */
+ public byte getQuoteEscape() {
+ return quoteEscape;
+ }
+
+ /**
+ * Defines the character used for escaping quotes inside an already quoted
+ * value. Defaults to '"'
+ *
+ * @param quoteEscape
+ * the quote escape character
+ */
+ public void setQuoteEscape(byte quoteEscape) {
+ this.quoteEscape = quoteEscape;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping quotes
+ * inside an already quoted value.
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the quote escape character, false
+ * otherwise
+ */
+ public boolean isQuoteEscape(byte ch) {
+ return this.quoteEscape == ch;
+ }
+
+ /**
+ * Returns the field delimiter character. Defaults to ','
+ * @return the field delimiter character
+ */
+ public byte getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * Defines the field delimiter character. Defaults to ','
+ * @param delimiter the field delimiter character
+ */
+ public void setDelimiter(byte delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * Identifies whether or not a given character represents a field delimiter
+ * @param ch the character to be verified
+ * @return true if the given character is the field delimiter character, false otherwise
+ */
+ public boolean isDelimiter(byte ch) {
+ return this.delimiter == ch;
+ }
+
+ /**
+ * Returns the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @return the String representation of an empty value
+ */
+ public String getEmptyValue() {
+ return emptyValue;
+ }
+
+ /**
+ * Sets the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @param emptyValue
+ * the String representation of an empty value
+ */
+ public void setEmptyValue(String emptyValue) {
+ this.emptyValue = emptyValue;
+ }
+
+ /**
+ * Indicates whether the CSV parser should accept unescaped quotes inside
+ * quoted values and parse them normally. Defaults to {@code true}.
+ *
+ * @return a flag indicating whether or not the CSV parser should accept
+ * unescaped quotes inside quoted values.
+ */
+ public boolean isParseUnescapedQuotes() {
+ return parseUnescapedQuotes;
+ }
+
+ /**
+ * Configures how to handle unescaped quotes inside quoted values. If set to
+ * {@code true}, the parser will parse the quote normally as part of the
+ * value. If set the {@code false}, a
+ * {@link com.univocity.parsers.common.TextParsingException} will be thrown.
+ * Defaults to {@code true}.
+ *
+ * @param parseUnescapedQuotes
+ * indicates whether or not the CSV parser should accept unescaped
+ * quotes inside quoted values.
+ */
+ public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+ this.parseUnescapedQuotes = parseUnescapedQuotes;
+ }
+
+ /**
+ * Indicates whether or not the first valid record parsed from the input
+ * should be considered as the row containing the names of each column
+ *
+ * @return true if the first valid record parsed from the input should be
+ * considered as the row containing the names of each column, false
+ * otherwise
+ */
+ public boolean isHeaderExtractionEnabled() {
+ return headerExtractionEnabled;
+ }
+
+ /**
+ * Defines whether or not the first valid record parsed from the input should
+ * be considered as the row containing the names of each column
+ *
+ * @param headerExtractionEnabled
+ * a flag indicating whether the first valid record parsed from the
+ * input should be considered as the row containing the names of each
+ * column
+ */
+ public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+ this.headerExtractionEnabled = headerExtractionEnabled;
+ }
+
+ public long getMaxCharsPerColumn() {
+ return maxCharsPerColumn;
+ }
+
+ public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+ this.maxCharsPerColumn = maxCharsPerColumn;
+ }
+
+ public void setComment(byte comment) {
+ this.comment = comment;
+ }
+
+ public byte getNormalizedNewLine() {
+ return normalizedNewLine;
+ }
+
+ public void setNormalizedNewLine(byte normalizedNewLine) {
+ this.normalizedNewLine = normalizedNewLine;
+ }
+
+ public boolean isIgnoreLeadingWhitespaces() {
+ return ignoreLeadingWhitespaces;
+ }
+
+ public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+ this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+ }
+
+ public boolean isIgnoreTrailingWhitespaces() {
+ return ignoreTrailingWhitespaces;
+ }
+
+ public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+ this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
new file mode 100644
index 000000000..17a076c0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+public final class TextReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+ private static final byte NULL_BYTE = (byte) '\0';
+
+ public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+ private final TextParsingContext context;
+
+ private final TextParsingSettingsV3 settings;
+
+ private final TextInput input;
+ private final TextOutput output;
+ private final DrillBuf workBuf;
+
+ private byte ch;
+
+ // index of the field within this record
+ private int fieldIndex;
+
+ /** Behavior settings **/
+ private final boolean ignoreTrailingWhitespace;
+ private final boolean ignoreLeadingWhitespace;
+ private final boolean parseUnescapedQuotes;
+
+ /** Key Characters **/
+ private final byte comment;
+ private final byte delimiter;
+ private final byte quote;
+ private final byte quoteEscape;
+ private final byte newLine;
+
+ /**
+ * The CsvParser supports all settings provided by {@link TextParsingSettingsV3},
+ * and requires this configuration to be properly initialized.
+ * @param settings the parser configuration
+ * @param input input stream
+ * @param output interface to produce output record batch
+ * @param workBuf working buffer to handle whitespace
+ */
+ public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+ this.context = new TextParsingContext(input, output);
+ this.workBuf = workBuf;
+ this.settings = settings;
+
+ this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+ this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+ this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+ this.delimiter = settings.getDelimiter();
+ this.quote = settings.getQuote();
+ this.quoteEscape = settings.getQuoteEscape();
+ this.newLine = settings.getNormalizedNewLine();
+ this.comment = settings.getComment();
+
+ this.input = input;
+ this.output = output;
+ }
+
+ public TextOutput getOutput() { return output; }
+
+ /**
+ * Check if the given byte is a white space. As per the univocity text reader
+ * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+ * we have an additional check to make sure its not negative
+ */
+ static final boolean isWhite(byte b){
+ return b <= ' ' && b > -1;
+ }
+
+ /**
+ * Inform the output interface to indicate we are starting a new record batch
+ */
+ public void resetForNextBatch() { }
+
+ public long getPos() { return input.getPos(); }
+
+ /**
+ * Function encapsulates parsing an entire record, delegates parsing of the
+ * fields to parseField() function.
+ * We mark the start of the record and if there are any failures encountered (OOM for eg)
+ * then we reset the input stream to the marked position
+ * @return true if parsing this record was successful; false otherwise
+ * @throws IOException for input file read errors
+ */
+ private boolean parseRecord() throws IOException {
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ input.mark();
+
+ fieldIndex = 0;
+ if (ignoreLeadingWhitespace && isWhite(ch)) {
+ skipWhitespace();
+ }
+
+ output.startRecord();
+ int fieldsWritten = 0;
+ try {
+ @SuppressWarnings("unused")
+ boolean earlyTerm = false;
+ while (ch != newLine) {
+ earlyTerm = ! parseField();
+ fieldsWritten++;
+ if (ch != newLine) {
+ ch = input.nextChar();
+ if (ch == newLine) {
+ output.startField(fieldsWritten++);
+ output.endEmptyField();
+ break;
+ }
+ }
+
+ // Disabling early termination. See DRILL-5914
+
+// if (earlyTerm) {
+// if (ch != newLine) {
+// input.skipLines(1);
+// }
+// break;
+// }
+ }
+ output.finishRecord();
+ } catch (StreamFinishedPseudoException e) {
+
+ // if we've written part of a field or all of a field, we should send this row.
+
+ if (fieldsWritten == 0) {
+ throw e;
+ } else {
+ output.finishRecord();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Function parses an individual field and ignores any white spaces encountered
+ * by not appending it to the output vector
+ * @throws IOException for input file read errors
+ */
+ private void parseValueIgnore() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ appendIgnoringWhitespace(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ public void appendIgnoringWhitespace(byte data) {
+ if (! isWhite(data)) {
+ output.append(data);
+ }
+ }
+
+ /**
+ * Function parses an individual field and appends all characters till the delimeter (or newline)
+ * to the output, including white spaces
+ * @throws IOException for input file read errors
+ */
+ private void parseValueAll() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ output.append(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ /**
+ * Function simply delegates the parsing of a single field to the actual
+ * implementation based on parsing config
+ *
+ * @throws IOException
+ * for input file read errors
+ */
+ private void parseValue() throws IOException {
+ if (ignoreTrailingWhitespace) {
+ parseValueIgnore();
+ } else {
+ parseValueAll();
+ }
+ }
+
+ /**
+ * Recursive function invoked when a quote is encountered. Function also
+ * handles the case when there are non-white space characters in the field
+ * after the quoted value.
+ * @param prev previous byte read
+ * @throws IOException for input file read errors
+ */
+ private void parseQuotedValue(byte prev) throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+ final byte quote = this.quote;
+
+ ch = input.nextCharNoNewLineCheck();
+
+ while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+ if (ch != quote) {
+ if (prev == quote) { // unescaped quote detected
+ if (parseUnescapedQuotes) {
+ output.append(quote);
+ output.append(ch);
+ parseQuotedValue(ch);
+ break;
+ } else {
+ throw new TextParsingException(
+ context,
+ "Unescaped quote character '"
+ + quote
+ + "' inside quoted value of CSV field. To allow unescaped quotes, "
+ + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. "
+ + "Cannot parse CSV input.");
+ }
+ }
+ output.append(ch);
+ prev = ch;
+ } else if (prev == quoteEscape) {
+ output.append(quote);
+ prev = NULL_BYTE;
+ } else {
+ prev = ch;
+ }
+ ch = input.nextCharNoNewLineCheck();
+ }
+
+ // Handles whitespace after quoted value:
+ // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ')
+ // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored
+ // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled.
+ if (ch != newLine && ch <= ' ' && ch != delimiter) {
+ final DrillBuf workBuf = this.workBuf;
+ workBuf.resetWriterIndex();
+ do {
+ // saves whitespace after value
+ workBuf.writeByte(ch);
+ ch = input.nextChar();
+ // found a new line, go to next record.
+ if (ch == newLine) {
+ return;
+ }
+ } while (ch <= ' ' && ch != delimiter);
+
+ // there's more stuff after the quoted value, not only empty spaces.
+ if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+ output.append(quote);
+ for(int i =0; i < workBuf.writerIndex(); i++){
+ output.append(workBuf.getByte(i));
+ }
+ // the next character is not the escape character, put it there
+ if (ch != quoteEscape) {
+ output.append(ch);
+ }
+ // sets this character as the previous character (may be escaping)
+ // calls recursively to keep parsing potentially quoted content
+ parseQuotedValue(ch);
+ }
+ }
+
+ if (!(ch == delimiter || ch == newLine)) {
+ throw new TextParsingException(context, "Unexpected character '" + ch
+ + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+ }
+ }
+
+ /**
+ * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+ * @return true if more rows can be read, false if not
+ * @throws IOException for input file read errors
+ */
+ private final boolean parseField() throws IOException {
+
+ output.startField(fieldIndex++);
+
+ if (isWhite(ch) && ignoreLeadingWhitespace) {
+ skipWhitespace();
+ }
+
+ // Delimiter? Then this is an empty field.
+
+ if (ch == delimiter) {
+ return output.endEmptyField();
+ }
+
+ // Have the first character of the field. Parse and save the
+ // field, even if we hit EOF. (An EOF identifies a last line
+ // that contains data, but is not terminated with a newline.)
+
+ try {
+ if (ch == quote) {
+ parseQuotedValue(NULL_BYTE);
+ } else {
+ parseValue();
+ }
+ return output.endField();
+ } catch (StreamFinishedPseudoException e) {
+ return output.endField();
+ }
+ }
+
+ /**
+ * Helper function to skip white spaces occurring at the current input stream.
+ * @throws IOException for input file read errors
+ */
+ private void skipWhitespace() throws IOException {
+ final byte delimiter = this.delimiter;
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ while (isWhite(ch) && ch != delimiter && ch != newLine) {
+ ch = input.nextChar();
+ }
+ }
+
+ /**
+ * Starting point for the reader. Sets up the input interface.
+ * @throws IOException for input file read errors
+ */
+ public final void start() throws IOException {
+ context.stopped = false;
+ input.start();
+ }
+
+ /**
+ * Parses the next record from the input. Will skip the line if its a comment,
+ * this is required when the file contains headers
+ * @throws IOException for input file read errors
+ */
+ public final boolean parseNext() throws IOException {
+ try {
+ while (! context.stopped) {
+ ch = input.nextChar();
+ if (ch == comment) {
+ input.skipLines(1);
+ continue;
+ }
+ break;
+ }
+ final long initialLineNumber = input.lineCount();
+ boolean success = parseRecord();
+ if (initialLineNumber + 1 < input.lineCount()) {
+ throw new TextParsingException(context, "Cannot use newline character within quoted string");
+ }
+
+ return success;
+ } catch (UserException ex) {
+ stopParsing();
+ throw ex;
+ } catch (StreamFinishedPseudoException ex) {
+ stopParsing();
+ return false;
+ } catch (Exception ex) {
+ try {
+ throw handleException(ex);
+ } finally {
+ stopParsing();
+ }
+ }
+ }
+
+ private void stopParsing() { }
+
+ private String displayLineSeparators(String str, boolean addNewLine) {
+ if (addNewLine) {
+ if (str.contains("\r\n")) {
+ str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+ } else if (str.contains("\n")) {
+ str = str.replaceAll("\\n", "[\\\\n]\n\t");
+ } else {
+ str = str.replaceAll("\\r", "[\\\\r]\r\t");
+ }
+ } else {
+ str = str.replaceAll("\\n", "\\\\n");
+ str = str.replaceAll("\\r", "\\\\r");
+ }
+ return str;
+ }
+
+ /**
+ * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+ * the exception.
+ * @param ex Exception raised
+ * @throws IOException for input file read errors
+ */
+ private TextParsingException handleException(Exception ex) throws IOException {
+
+ if (ex instanceof TextParsingException) {
+ throw (TextParsingException) ex;
+ }
+
+ String message = null;
+ String tmp = input.getStringSinceMarkForError();
+ char[] chars = tmp.toCharArray();
+ if (chars != null) {
+ int length = chars.length;
+ if (length > settings.getMaxCharsPerColumn()) {
+ message = "Length of parsed input (" + length
+ + ") exceeds the maximum number of characters defined in your parser settings ("
+ + settings.getMaxCharsPerColumn() + "). ";
+ }
+
+ if (tmp.contains("\n") || tmp.contains("\r")) {
+ tmp = displayLineSeparators(tmp, true);
+ String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+ message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+ + lineSeparator + "'. Parsed content:\n\t" + tmp;
+ }
+
+ int nullCharacterCount = 0;
+ // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+ int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+ StringBuilder s = new StringBuilder(maxLength);
+ for (int i = 0; i < maxLength; i++) {
+ if (chars[i] == '\0') {
+ s.append('\\');
+ s.append('0');
+ nullCharacterCount++;
+ } else {
+ s.append(chars[i]);
+ }
+ }
+ tmp = s.toString();
+
+ if (nullCharacterCount > 0) {
+ message += "\nIdentified "
+ + nullCharacterCount
+ + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+ + tmp;
+ }
+ }
+
+ UserException.Builder builder;
+ if (ex instanceof UserException) {
+ builder = ((UserException) ex).rebuild();
+ } else {
+ builder = UserException
+ .dataReadError(ex)
+ .message(message);
+ }
+ throw builder
+ .addContext("Line", context.currentLine())
+ .addContext("Record", context.currentRecord())
+ .build(logger);
+ }
+
+ /**
+ * Finish the processing of a batch, indicates to the output
+ * interface to wrap up the batch
+ */
+ public void finishBatch() { }
+
+ /**
+ * Invoked once there are no more records and we are done with the
+ * current record reader to clean up state.
+ * @throws IOException for input file read errors
+ */
+ public void close() throws IOException {
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
new file mode 100644
index 000000000..aced5adfd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Version 3 of the text reader. Hosts the "compliant" text reader on
+ * the row set framework.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a379db175..37b8a4073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -593,7 +593,6 @@ public class Foreman implements Runnable {
new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
-
/**
* Manages the end-state processing for Foreman.
*
@@ -726,7 +725,6 @@ public class Foreman implements Runnable {
}
}
- @SuppressWarnings("resource")
@Override
public void close() {
Preconditions.checkState(!isClosed);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7e9415530..5443eea5f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -522,6 +522,8 @@ drill.exec.options: {
exec.queue.memory_reserve_ratio: 0.2,
exec.sort.disable_managed : false,
exec.storage.enable_new_text_reader: true,
+ exec.storage.enable_v3_text_reader: false,
+ exec.storage.min_width: 1,
exec.udf.enable_dynamic_support: true,
exec.udf.use_dynamic: true,
drill.exec.stats.logging.batch_size: false,