aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2019-02-23 17:48:21 -0800
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2019-03-11 11:48:37 +0200
commitd585452b52e94a91ae76a24550c5c476847a9cba (patch)
tree4e91dc0d55bfb5efb93348a323645d5d4ebced40
parent4a79e2a52ad2dac64ba001645da1442f0b06fd62 (diff)
DRILL-6952: Host compliant text reader on the row set framework
The result set loader allows controlling batch sizes. The new scan framework built on top of that framework handles projection, implicit columns, null columns and more. This commit converts the "new" ("compliant") text reader to use the new framework. Options select the use of the V2 ("new") or V3 (row-set based) versions. Unit tests demonstrate V3 functionality. closes #1683
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java84
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java442
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java301
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java269
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java267
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java137
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java368
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java126
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java305
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java508
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java29
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java9
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java15
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java12
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java101
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java271
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java114
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java873
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java397
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java91
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java375
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java2
59 files changed, 5141 insertions, 696 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 93adda17c..25ac2c9f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -713,6 +713,14 @@ public final class ExecConstants {
public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY,
new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files."));
+ public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader";
+ public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY,
+ new OptionDescription("Enables the row set based version of the text/csv reader."));
+
+ public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width";
+ public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY,
+ new OptionDescription("Min width for text readers, mostly for testing."));
+
public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index 84aebdcd0..ebd7288a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -31,7 +31,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
public abstract class AbstractExchange extends AbstractSingle implements Exchange {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
// Ephemeral info for generating execution fragments.
protected int senderMajorFragmentId;
@@ -77,7 +76,7 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
}
/**
- * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances
+ * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrences
* in given endpoint list.
*
* @param fragmentEndpoints Drillbit endpoint assignments of fragments.
@@ -111,7 +110,6 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
setupSenders(senderLocations);
}
-
@Override
public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
this.receiverMajorFragmentId = majorFragmentId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
index 3bb1e5403..a5229f9d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.drill.exec.store.dfs.FileSelection;
public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);
public AbstractFileGroupScan(String userName) {
super(userName);
@@ -46,5 +45,4 @@ public abstract class AbstractFileGroupScan extends AbstractGroupScan implements
public boolean supportsPartitionFilterPushdown() {
return true;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index d744db424..db4c73d9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -25,8 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-public abstract class AbstractSubScan extends AbstractBase implements SubScan{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
+public abstract class AbstractSubScan extends AbstractBase implements SubScan {
public AbstractSubScan(String userName) {
super(userName);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 063e26bef..1abc3d804 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -499,7 +499,6 @@ public class ScanBatch implements CloseableRecordBatch {
schemaChanged = false;
}
- @SuppressWarnings("resource")
private <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
Map<String, ValueVector> fieldVectorMap;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index c9d97bf72..436aea06b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -42,7 +42,6 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -77,21 +76,21 @@ import java.util.List;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+
+ private static final String EMPTY_STRING = "";
+
private Projector projector;
private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters;
private List<FieldReference> complexFieldReferencesList;
private boolean hasRemainder = false;
- private int remainderIndex = 0;
+ private int remainderIndex;
private int recordCount;
-
private ProjectMemoryManager memoryManager;
-
-
- private static final String EMPTY_STRING = "";
private boolean first = true;
private boolean wasNone = false; // whether a NONE iter outcome was already seen
+ private ColumnExplorer columnExplorer;
private class ClassifierResult {
public boolean isStar = false;
@@ -114,6 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
+ columnExplorer = new ColumnExplorer(context.getOptions());
}
@Override
@@ -121,14 +121,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return recordCount;
}
-
@Override
protected void killIncoming(final boolean sendUpstream) {
super.killIncoming(sendUpstream);
hasRemainder = false;
}
-
@Override
public IterOutcome innerNext() {
if (wasNone) {
@@ -145,7 +143,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
@Override
public VectorContainer getOutgoingContainer() {
- return this.container;
+ return container;
}
@Override
@@ -204,14 +202,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
incomingRecordCount = incoming.getRecordCount();
memoryManager.update();
- logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}",
+ logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
}
}
}
if (complexWriters != null && getLastKnownOutcome() == EMIT) {
- throw new UnsupportedOperationException("Currently functions producing complex types as output is not " +
+ throw new UnsupportedOperationException("Currently functions producing complex types as output are not " +
"supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
"function in the projection list of outermost query.");
}
@@ -219,7 +217,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
first = false;
container.zeroVectors();
-
int maxOuputRecordCount = memoryManager.getOutputRowCount();
logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
@@ -233,7 +230,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
long projectEndTime = System.currentTimeMillis();
logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime));
-
if (outputRecords < incomingRecordCount) {
setValueCount(outputRecords);
hasRemainder = true;
@@ -277,7 +273,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0);
long projectEndTime = System.currentTimeMillis();
- logger.trace("handleRemainder: projection: " + "records {}, time {} ms", projRecords,(projectEndTime - projectStartTime));
+ logger.trace("handleRemainder: projection: records {}, time {} ms", projRecords,(projectEndTime - projectStartTime));
if (projRecords < remainingRecordCount) {
setValueCount(projRecords);
@@ -463,7 +459,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
memoryManager.addNewField(vv, write);
- final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+ cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
}
}
continue;
@@ -546,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
- final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
+ cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
memoryManager.addNewField(ouputVector, write);
// We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
@@ -590,7 +586,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
private boolean isImplicitFileColumn(ValueVector vvIn) {
- return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null;
+ return columnExplorer.isImplicitFileColumn(vvIn.getField().getName());
}
private List<NamedExpression> getExpressionList() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index 966a03901..b9b3e782e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -24,7 +24,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan
import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput;
+import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader;
/**
* Parses the `columns` array. Doing so is surprisingly complex.
@@ -113,14 +113,14 @@ public class ColumnsArrayParser implements ScanProjectionParser {
if (inCol.isArray()) {
int maxIndex = inCol.maxIndex();
- if (maxIndex > RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) {
+ if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
throw UserException
.validationError()
.message(String.format(
"`columns`[%d] index out of bounds, max supported size is %d",
- maxIndex, RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS))
+ maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS))
.addContext("Column", inCol.name())
- .addContext("Maximum index", RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS)
.build(logger);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
index 8352dfa07..1166a5234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
@@ -69,6 +69,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
private List<FileSplit> spilts = new ArrayList<>();
private Iterator<FileSplit> splitIter;
private Path scanRootDir;
+ private int partitionDepth;
protected DrillFileSystem dfs;
private FileMetadataManager metadataManager;
@@ -82,12 +83,18 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
/**
* Specify the selection root for a directory scan, if any.
- * Used to populate partition columns.
+ * Used to populate partition columns. Also, specify the maximum
+ * partition depth.
+ *
* @param rootPath Hadoop file path for the directory
+ * @param partitionDepth maximum partition depth across all files
+ * within this logical scan operator (files in this scan may be
+ * shallower)
*/
- public void setSelectionRoot(Path rootPath) {
+ public void setSelectionRoot(Path rootPath, int partitionDepth) {
this.scanRootDir = rootPath;
+ this.partitionDepth = partitionDepth;
}
@Override
@@ -122,7 +129,10 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
metadataManager = new FileMetadataManager(
context.getFragmentContext().getOptions(),
+ true, // Expand partition columns with wildcard
+ false, // Put partition columns after table columns
scanRootDir,
+ partitionDepth,
paths);
scanOrchestrator.withMetadata(metadataManager);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index ae8502b52..a4ace55be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -46,6 +46,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
private boolean hasImplicitCols;
+ private boolean expandPartitionsAtEnd;
+
public FileMetadataColumnsParser(FileMetadataManager metadataManager) {
this.metadataManager = metadataManager;
partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
@@ -123,8 +125,10 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
}
private void buildWildcard() {
- if (metadataManager.useLegacyWildcardExpansion &&
- metadataManager.useLegacyExpansionLocation) {
+ if (!metadataManager.useLegacyWildcardExpansion) {
+ return;
+ }
+ if (metadataManager.useLegacyExpansionLocation) {
// Star column: this is a SELECT * query.
@@ -134,6 +138,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// set is constant across all files.
expandPartitions();
+ } else {
+ expandPartitionsAtEnd = true;
}
}
@@ -144,8 +150,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// feature to expand partitions for wildcards, and we want the
// partitions after data columns.
- if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion &&
- ! metadataManager.useLegacyExpansionLocation) {
+ if (expandPartitionsAtEnd) {
expandPartitions();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index ba49a9f54..201d30859 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -41,24 +41,51 @@ import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+/**
+ * Manages the insertion of file metadata (AKA "implicit" and partition) columns.
+ * Parses the file metadata columns from the projection list. Creates and loads
+ * the vectors that hold the data. If running in legacy mode, inserts partition
+ * columns when the query contains a wildcard. Supports renaming the columns via
+ * session options.
+ * <p>
+ * The lifecycle is that the manager is given the set of files for this scan
+ * operator so it can determine the partition depth. (Note that different scans
+ * may not agree on the depth. This is a known issue with Drill's implementation.)
+ * <p>
+ * Then, at the start of the scan, all projection columns are parsed. This class
+ * picks out the file metadata columns.
+ * <p>
+ * On each file (on each reader), the columns are "resolved." Here, that means
+ * that the columns are filled in with actual values based on the present file.
+ * <p>
+ * This is the successor to {@link ColumnExplorer}.
+ */
+
public class FileMetadataManager implements MetadataManager, SchemaProjectionResolver, VectorSource {
+ /**
+ * Automatically compute partition depth from files. Use only
+ * for testing!
+ */
+
+ public static final int AUTO_PARTITION_DEPTH = -1;
+
// Input
- private Path scanRootDir;
+ private final Path scanRootDir;
+ private final int partitionCount;
private FileMetadata currentFile;
// Config
protected final String partitionDesignator;
- protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
- protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
+ protected final List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
+ protected final Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
/**
* Indicates whether to expand partition columns when the query contains a wildcard.
* Supports queries such as the following:<code><pre>
- * select * from dfs.`partitioned-dir`
- * </pre><code>
+ * select * from dfs.`partitioned-dir`</pre></code>
* In which the output columns will be (columns, dir0) if the partitioned directory
* has one level of nesting.
*
@@ -86,7 +113,6 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
private final List<MetadataColumn> metadataColumns = new ArrayList<>();
private ConstantColumnLoader loader;
private VectorContainer outputContainer;
- private final int partitionCount;
/**
* Specifies whether to plan based on the legacy meaning of "*". See
@@ -111,10 +137,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
public FileMetadataManager(OptionSet optionManager,
boolean useLegacyWildcardExpansion,
boolean useLegacyExpansionLocation,
- Path rootDir, List<Path> files) {
+ Path rootDir,
+ int partitionCount,
+ List<Path> files) {
this.useLegacyWildcardExpansion = useLegacyWildcardExpansion;
this.useLegacyExpansionLocation = useLegacyExpansionLocation;
- scanRootDir = rootDir;
partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
for (ImplicitFileColumns e : ImplicitFileColumns.values()) {
@@ -129,28 +156,32 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
// The files and root dir are optional.
- if (scanRootDir == null || files == null) {
- partitionCount = 0;
+ if (rootDir == null || files == null) {
+ scanRootDir = null;
+ this.partitionCount = 0;
// Special case in which the file is the same as the
// root directory (occurs for a query with only one file.)
- } else if (files.size() == 1 && scanRootDir.equals(files.get(0))) {
+ } else if (files.size() == 1 && rootDir.equals(files.get(0))) {
scanRootDir = null;
- partitionCount = 0;
+ this.partitionCount = 0;
} else {
+ scanRootDir = rootDir;
- // Compute the partitions.
+ // Compute the partitions. Normally the count is passed in.
+ // But, handle the case where the count is unknown. Note: use this
+ // convenience only in testing since, in production, it can result
+ // in different scans reporting different numbers of partitions.
- partitionCount = computeMaxPartition(files);
+ if (partitionCount == -1) {
+ this.partitionCount = computeMaxPartition(files);
+ } else {
+ this.partitionCount = partitionCount;
+ }
}
}
- public FileMetadataManager(OptionSet optionManager,
- Path rootDir, List<Path> files) {
- this(optionManager, false, false, rootDir, files);
- }
-
private int computeMaxPartition(List<Path> files) {
int maxLen = 0;
for (Path filePath : files) {
@@ -165,6 +196,17 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
this.vectorCache = vectorCache;
}
+ /**
+ * Returns the file metadata column parser that:
+ * <ul>
+ * <li>Picks out the file metadata and partition columns,</li>
+ * <li>Inserts partition columns for a wildcard query, if the
+ * option to do so is set.</li>
+ * </ul>
+ *
+ * @see {@link #useLegacyWildcardExpansion}
+ */
+
@Override
public ScanProjectionParser projectionParser() { return parser; }
@@ -223,6 +265,10 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
currentFile = null;
}
+ /**
+ * Resolves metadata columns to concrete, materialized columns with the
+ * proper value for the present file.
+ */
@Override
public boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple,
TupleMetadata tableSchema) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index a5d6ca2a1..926936550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -282,7 +282,9 @@ public class ScanSchemaOrchestrator {
ScanProjectionParser parser = metadataManager.projectionParser();
if (parser != null) {
- parsers.add(parser);
+ // Insert in first position so that it is ensured to see
+ // any wildcard that exists
+ parsers.add(0, parser);
}
// Parse the projection list.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
index f109578e9..be45d569e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -87,17 +87,17 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
"width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
// 1.5 Cap the parallelization width by max allowed parallelization per node
- width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode()));
+ width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode()));
- // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more,
- // we end up allocating more work units to one or more endpoints that don't have those many work units.
+ // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if the width is more,
+ // we end up allocating more work units to one or more endpoints that don't have that many work units.
width = Math.min(totalMaxWidth, width);
// Step 2: Select the endpoints
final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
// 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled.
- for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
+ for (Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) {
endpoints.put(entry.getKey(), 1);
}
int totalAssigned = endpoints.size();
@@ -105,15 +105,15 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
// 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint
int remainingSlots = width - endpoints.size();
while (remainingSlots > 0) {
- for(EndpointAffinity epAf : endpointPool.values()) {
+ for (EndpointAffinity epAf : endpointPool.values()) {
final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots);
int currentAssignments = endpoints.get(epAf.getEndpoint());
- for(int i=0;
- i < moreAllocation &&
+ for (int i=0;
+ i < moreAllocation &&
totalAssigned < width &&
currentAssignments < parameters.getMaxWidthPerNode() &&
currentAssignments < epAf.getMaxWidth();
- i++) {
+ i++) {
totalAssigned++;
currentAssignments++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 8de57bca0..39b699fe4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -179,7 +179,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) {
planningSet.get(rootFragment);
- for(ExchangeFragmentPair fragmentPair : rootFragment) {
+ for (ExchangeFragmentPair fragmentPair : rootFragment) {
initFragmentWrappers(fragmentPair.getNode(), planningSet);
}
}
@@ -193,7 +193,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
- for(Wrapper currentFragmentWrapper : planningSet) {
+ for (Wrapper currentFragmentWrapper : planningSet) {
ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
if (sendingExchange != null) {
ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
@@ -209,17 +209,17 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
// parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
- // remove the fragment on which the current fragment depends on.
+ // remove the fragment on which the current fragment depends.
final Set<Wrapper> roots = Sets.newHashSet();
- for(Wrapper w : planningSet) {
+ for (Wrapper w : planningSet) {
roots.add(w);
}
- for(Wrapper wrapper : planningSet) {
+ for (Wrapper wrapper : planningSet) {
final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
- for(Wrapper dependency : fragmentDependencies) {
+ for (Wrapper dependency : fragmentDependencies) {
if (roots.contains(dependency)) {
roots.remove(dependency);
}
@@ -241,7 +241,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
return;
}
- // First parallelize fragments on which this fragment depends on.
+ // First parallelize fragments on which this fragment depends.
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for(Wrapper dependency : fragmentDependencies) {
@@ -288,20 +288,20 @@ public class SimpleParallelizer implements ParallelizationParameters {
Preconditions.checkArgument(op instanceof FragmentRoot);
FragmentRoot root = (FragmentRoot) op;
- FragmentHandle handle = FragmentHandle //
- .newBuilder() //
- .setMajorFragmentId(wrapper.getMajorFragmentId()) //
- .setMinorFragmentId(minorFragmentId) //
- .setQueryId(queryId) //
+ FragmentHandle handle = FragmentHandle
+ .newBuilder()
+ .setMajorFragmentId(wrapper.getMajorFragmentId())
+ .setMinorFragmentId(minorFragmentId)
+ .setQueryId(queryId)
.build();
- PlanFragment fragment = PlanFragment.newBuilder() //
- .setForeman(foremanNode) //
- .setHandle(handle) //
- .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
- .setLeafFragment(isLeafFragment) //
+ PlanFragment fragment = PlanFragment.newBuilder()
+ .setForeman(foremanNode)
+ .setHandle(handle)
+ .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId))
+ .setLeafFragment(isLeafFragment)
.setContext(queryContextInfo)
- .setMemInitial(wrapper.getInitialAllocation())//
+ .setMemInitial(wrapper.getInitialAllocation())
.setMemMax(wrapper.getMaxAllocation())
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 760a6ba5b..a07d3ed58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -111,7 +111,7 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA
final ScanStats stats = this.getGroupScan().getScanStats(settings);
final int columnCount = this.getRowType().getFieldCount();
- if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index b4fada61f..4d702a830 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -30,8 +30,8 @@ public class ScanPrule extends Prule{
public ScanPrule() {
super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule");
-
}
+
@Override
public void onMatch(RelOptRuleCall call) {
final DrillScanRel scan = (DrillScanRel) call.rel(0);
@@ -48,5 +48,4 @@ public class ScanPrule extends Prule{
call.transformTo(newScan);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index fa8e69d0b..267cecd4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -159,13 +159,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
class MajorFragmentStat {
private DistributionAffinity distributionAffinity = DistributionAffinity.NONE;
- private double maxRows = 0d;
+ private double maxRows;
private int maxWidth = Integer.MAX_VALUE;
- private boolean isMultiSubScan = false;
- private boolean rightSideOfLateral = false;
+ private boolean isMultiSubScan;
+ private boolean rightSideOfLateral;
//This flag if true signifies that all the Rels thus far
//are simple rels with no distribution requirement.
- private boolean isSimpleRel = false;
+ private boolean isSimpleRel;
public void add(Prel prel) {
maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -196,7 +196,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
return false;
}
- int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize);
+ int suggestedWidth = (int) Math.ceil((maxRows + 1) / targetSliceSize);
int w = Math.min(maxWidth, suggestedWidth);
if (w < 1) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 9cd670e99..e201e2686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -242,6 +242,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR),
new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER),
+ new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER),
+ new OptionDefinition(ExecConstants.MIN_READER_WIDTH),
new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST),
new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE),
new OptionDefinition(ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 33b500018..1ae2d5e9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -55,14 +55,30 @@ public class ColumnExplorer {
*/
public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
- this.columns = columns;
- this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
this.selectedPartitionColumns = Lists.newArrayList();
this.tableColumns = Lists.newArrayList();
this.allImplicitColumns = initImplicitFileColumns(optionManager);
this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap();
+ if (columns == null) {
+ isStarQuery = false;
+ this.columns = null;
+ } else {
+ this.columns = columns;
+ this.isStarQuery = Utilities.isStarQuery(columns);
+ init();
+ }
+ }
- init();
+ /**
+ * Constructor for using the column explorer to probe existing columns in the
+ * {@link ProjectRecordBatch}.
+ */
+ // TODO: This is awkward. This class is being used for two distinct things:
+ // 1. The definition of the metadata columns, and
+ // 2. The projection of metadata columns in a particular query.
+ // Would be better to separate these two concepts.
+ public ColumnExplorer(OptionManager optionManager) {
+ this(optionManager, null);
}
/**
@@ -123,6 +139,15 @@ public class ColumnExplorer {
return matcher.matches();
}
+ public boolean isImplicitColumn(String name) {
+ return isPartitionColumn(partitionDesignator, name) ||
+ isImplicitFileColumn(name);
+ }
+
+ public boolean isImplicitFileColumn(String name) {
+ return allImplicitColumns.get(name) != null;
+ }
+
/**
* Returns list with partition column names.
* For the case when table has several levels of nesting, max level is chosen.
@@ -132,10 +157,24 @@ public class ColumnExplorer {
* @return list with partition column names.
*/
public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) {
- int partitionsCount = 0;
+ int partitionsCount = getPartitionDepth(selection);
+
+ String partitionColumnLabel = schemaConfig.getOption(
+ ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+ List<String> partitions = new ArrayList<>();
+
+ // generates partition column names: dir0, dir1 etc.
+ for (int i = 0; i < partitionsCount; i++) {
+ partitions.add(partitionColumnLabel + i);
+ }
+ return partitions;
+ }
+
+ public static int getPartitionDepth(FileSelection selection) {
// a depth of table root path
int rootDepth = selection.getSelectionRoot().depth();
+ int partitionsCount = 0;
for (Path file : selection.getFiles()) {
// Calculates partitions count for the concrete file:
// depth of file path - depth of table root path - 1.
@@ -145,15 +184,7 @@ public class ColumnExplorer {
// max depth of files path should be used to handle all partitions
partitionsCount = Math.max(partitionsCount, currentPartitionsCount);
}
-
- String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
- List<String> partitions = new ArrayList<>();
-
- // generates partition column names: dir0, dir1 etc.
- for (int i = 0; i < partitionsCount; i++) {
- partitions.add(partitionColumnLabel + i);
- }
- return partitions;
+ return partitionsCount;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 13017fadc..254cddbe3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -73,5 +73,4 @@ public interface FormatPlugin {
Configuration getFsConf();
DrillbitContext getContext();
String getName();
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d76c6489e..dc1f053a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.drill.shaded.guava.com.google.common.base.Functions;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -39,10 +40,16 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -55,67 +62,335 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+/**
+ * Base class for various file readers.
+ * <p>
+ * This version provides a bridge between the legacy {@link RecordReader}-style
+ * readers and the newer {@link FileBatchReader} style. Over time, split the
+ * class, or provide a cleaner way to handle the differences.
+ *
+ * @param <T> the format plugin config for this reader
+ */
+
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
- @SuppressWarnings("unused")
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ /**
+ * Defines the static, programmer-defined options for this plugin. These
+ * options are attributes of how the plugin works. The plugin config,
+ * defined in the class definition, provides user-defined options that can
+ * vary across uses of the plugin.
+ */
+
+ public static class EasyFormatConfig {
+ public BasicFormatMatcher matcher;
+ public boolean readable = true;
+ public boolean writable;
+ public boolean blockSplittable;
+ public boolean compressible;
+ public Configuration fsConf;
+ public List<String> extensions;
+ public String defaultName;
+
+ // Config options that, prior to Drill 1.15, required the plugin to
+ // override methods. Moving forward, plugins should be migrated to
+ // use this simpler form. New plugins should use these options
+ // instead of overriding methods.
+
+ public boolean supportsProjectPushdown;
+ public boolean supportsAutoPartitioning;
+ public int readerOperatorType = -1;
+ public int writerOperatorType = -1;
+ }
+
+ /**
+ * Creates the scan batch to use with the plugin. Drill supports the "classic"
+ * style of scan batch and readers, along with the newer size-aware,
+ * component-based version. The implementation of this class assembles the
+ * readers and scan batch operator as needed for each version.
+ */
+
+ public interface ScanBatchCreator {
+ CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan)
+ throws ExecutionSetupException;
+ }
+
+ /**
+ * Use the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+
+ public static class ClassicScanBatchCreator implements ScanBatchCreator {
+
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+ if (! columnExplorer.isStarQuery()) {
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+ columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth());
+ scan.setOperatorId(scan.getOperatorId());
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(scan);
+ final DrillFileSystem dfs;
+ try {
+ dfs = oContext.newFileSystem(plugin.easyConfig().fsConf);
+ } catch (final IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+ }
+
+ final List<RecordReader> readers = new LinkedList<>();
+ final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+ Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+ final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
+ for (final FileWork work : scan.getWorkUnits()) {
+ final RecordReader recordReader = getRecordReader(
+ plugin, context, dfs, work, scan.getColumns(), scan.getUserName());
+ readers.add(recordReader);
+ final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+ work.getPath(), scan.getSelectionRoot(), false);
+ final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(
+ work.getPath(), partitionValues, supportsFileImplicitColumns);
+ implicitColumns.add(implicitValues);
+ if (implicitValues.size() > mapWithMaxColumns.size()) {
+ mapWithMaxColumns = implicitValues;
+ }
+ }
+
+ // all readers should have the same number of implicit columns, add missing ones with value null
+ final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+ for (final Map<String, String> map : implicitColumns) {
+ map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+ }
+
+ return new ScanBatch(context, oContext, readers, implicitColumns);
+ }
+
+ /**
+ * Create a record reader given a file system, a file description and other
+ * information. For backward compatibility, calls the plugin method by
+ * default.
+ *
+ * @param plugin
+ * the plugin creating the scan
+ * @param context
+ * fragment context for the fragment running the scan
+ * @param dfs
+ * Drill's distributed file system facade
+ * @param fileWork
+ * description of the file to scan
+ * @param columns
+ * list of columns to project
+ * @param userName
+ * the name of the user performing the scan
+ * @return a scan operator
+ * @throws ExecutionSetupException
+ * if anything goes wrong
+ */
+
+ public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
+ }
+ }
+
+ /**
+ * Revised scanner based on the revised
+ * {@link ResultSetLoader} and {@link RowBatchReader} classes.
+ * Handles most projection tasks automatically. Able to limit
+ * vector and batch sizes. Use this for new format plugins.
+ */
+
+ public abstract static class ScanFrameworkCreator
+ implements ScanBatchCreator {
+
+ protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ /**
+ * Builds the revised {@link FileBatchReader}-based scan batch.
+ *
+ * @param context
+ * @param scan
+ * @return
+ * @throws ExecutionSetupException
+ */
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+
+ // Assemble the scan operator and its wrapper.
+
+ try {
+ final BaseFileScanFramework<?> framework = buildFramework(scan);
+ final Path selectionRoot = scan.getSelectionRoot();
+ if (selectionRoot != null) {
+ framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth());
+ }
+ return new OperatorRecordBatch(
+ context, scan,
+ new ScanOperatorExec(
+ framework));
+ } catch (final UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (final Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ /**
+ * Create the plugin-specific framework that manages the scan. The framework
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles "early" or "late" schema readers. A typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ *
+ * @param scan the physical operation definition for the scan operation. Contains
+ * one or more files to read. (The Easy format plugin works only for files.)
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+ protected abstract BaseFileScanFramework<?> buildFramework(
+ EasySubScan scan) throws ExecutionSetupException;
+ }
+
+ /**
+ * Generic framework creator for files that just use the basic file
+ * support: metadata, etc. Specialized use cases (special "columns"
+ * column, say) will require a specialized implementation.
+ */
+
+ public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
+
+ private final FileReaderFactory readerCreator;
+
+ public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FileReaderFactory readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ }
+
+ @Override
+ protected FileScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
- private final BasicFormatMatcher matcher;
+ final FileScanFramework framework = new FileScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+ return framework;
+ }
+ }
+
+ private final String name;
+ private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
- private final boolean readable;
- private final boolean writable;
- private final boolean blockSplittable;
- private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final T formatConfig;
- private final String name;
- private final boolean compressible;
+ /**
+ * Legacy constructor.
+ */
protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
- boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
- this.readable = readable;
- this.writable = writable;
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
+ boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName) {
+ this.name = name == null ? defaultName : name;
+ easyConfig = new EasyFormatConfig();
+ easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
+ easyConfig.readable = readable;
+ easyConfig.writable = writable;
this.context = context;
- this.blockSplittable = blockSplittable;
- this.compressible = compressible;
- this.fsConf = fsConf;
+ easyConfig.blockSplittable = blockSplittable;
+ easyConfig.compressible = compressible;
+ easyConfig.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
- this.name = name == null ? defaultName : name;
}
- @Override
- public Configuration getFsConf() {
- return fsConf;
+ /**
+ * Revised constructor in which settings are gathered into a configuration object.
+ *
+ * @param name name of the plugin
+ * @param config configuration options for this plugin which determine
+ * developer-defined runtime behavior
+ * @param context the global server-wide drillbit context
+ * @param storageConfig the configuration for the storage plugin that owns this
+ * format plugin
+ * @param formatConfig the Jackson-serialized format configuration as created
+ * by the user in the Drill web console. Holds user-defined options.
+ */
+
+ protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context,
+ StoragePluginConfig storageConfig, T formatConfig) {
+ this.name = name;
+ this.easyConfig = config;
+ this.context = context;
+ this.storageConfig = storageConfig;
+ this.formatConfig = formatConfig;
+ if (easyConfig.matcher == null) {
+ easyConfig.matcher = new BasicFormatMatcher(this,
+ easyConfig.fsConf, easyConfig.extensions,
+ easyConfig.compressible);
+ }
}
@Override
- public DrillbitContext getContext() {
- return context;
- }
+ public Configuration getFsConf() { return easyConfig.fsConf; }
@Override
- public String getName() {
- return name;
- }
+ public DrillbitContext getContext() { return context; }
- public abstract boolean supportsPushDown();
+ public EasyFormatConfig easyConfig() { return easyConfig; }
+
+ @Override
+ public String getName() { return name; }
/**
- * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
- * only split on file boundaries.
+ * Does this plugin support projection push down? That is, can the reader
+ * itself handle the tasks of projecting table columns, creating null
+ * columns for missing table columns, and so on?
*
- * @return True if splittable.
+ * @return <tt>true</tt> if the plugin supports projection push-down,
+ * <tt>false</tt> if Drill should do the task by adding a project operator
*/
- public boolean isBlockSplittable() {
- return blockSplittable;
- }
+
+ public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; }
+
+ /**
+ * Whether or not you can split the format based on blocks within file
+ * boundaries. If not, the simple format engine will only split on file
+ * boundaries.
+ *
+ * @return <code>true</code> if splittable.
+ */
+ public boolean isBlockSplittable() { return easyConfig.blockSplittable; }
/**
* Indicates whether or not this format could also be in a compression
@@ -125,52 +400,40 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*
* @return <code>true</code> if it is compressible
*/
- public boolean isCompressible() {
- return compressible;
- }
-
- public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
- List<SchemaPath> columns, String userName) throws ExecutionSetupException;
+ public boolean isCompressible() { return easyConfig.compressible; }
- CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
- final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
-
- if (!columnExplorer.isStarQuery()) {
- scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
- columnExplorer.getTableColumns(), scan.getSelectionRoot());
- scan.setOperatorId(scan.getOperatorId());
- }
+ /**
+ * Return a record reader for the specific file format, when using the original
+ * {@link ScanBatch} scanner.
+ * @param context fragment context
+ * @param dfs Drill file system
+ * @param fileWork metadata about the file to be scanned
+ * @param columns list of projected columns (or may just contain the wildcard)
+ * @param userName the name of the user running the query
+ * @return a record reader for this format
+ * @throws ExecutionSetupException for many reasons
+ */
- OperatorContext oContext = context.newOperatorContext(scan);
- final DrillFileSystem dfs;
- try {
- dfs = oContext.newFileSystem(fsConf);
- } catch (IOException e) {
- throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
- }
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner.");
+ }
- List<RecordReader> readers = new LinkedList<>();
- List<Map<String, String>> implicitColumns = Lists.newArrayList();
- Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
- boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
- for (FileWork work : scan.getWorkUnits()){
- RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName());
- readers.add(recordReader);
- List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false);
- Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns);
- implicitColumns.add(implicitValues);
- if (implicitValues.size() > mapWithMaxColumns.size()) {
- mapWithMaxColumns = implicitValues;
- }
- }
+ protected CloseableRecordBatch getReaderBatch(final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+ return scanBatchCreator(context.getOptions()).buildScan(context, scan);
+ }
- // all readers should have the same number of implicit columns, add missing ones with value null
- Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
- for (Map<String, String> map : implicitColumns) {
- map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
- }
+ /**
+ * Create the scan batch creator. Needed only when using the revised scan batch. In that
+ * case, override the <tt>readerIterator()</tt> method on the custom scan batch
+ * creator implementation.
+ *
+ * @return the strategy for creating the scan batch for this plugin
+ */
- return new ScanBatch(context, oContext, readers, implicitColumns);
+ protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+ return new ClassicScanBatchCreator(this);
}
public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
@@ -219,41 +482,28 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
@Override
- public T getConfig() {
- return formatConfig;
- }
+ public T getConfig() { return formatConfig; }
@Override
- public StoragePluginConfig getStorageConfig() {
- return storageConfig;
- }
+ public StoragePluginConfig getStorageConfig() { return storageConfig; }
@Override
- public boolean supportsRead() {
- return readable;
- }
+ public boolean supportsRead() { return easyConfig.readable; }
@Override
- public boolean supportsWrite() {
- return writable;
- }
+ public boolean supportsWrite() { return easyConfig.writable; }
@Override
- public boolean supportsAutoPartitioning() {
- return false;
- }
+ public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; }
@Override
- public FormatMatcher getMatcher() {
- return matcher;
- }
+ public FormatMatcher getMatcher() { return easyConfig.matcher; }
@Override
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of();
}
- public abstract int getReaderOperatorType();
- public abstract int getWriterOperatorType();
-
+ public int getReaderOperatorType() { return easyConfig.readerOperatorType; }
+ public int getWriterOperatorType() { return easyConfig.writerOperatorType; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 4449ec054..6a6243cd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -57,9 +58,11 @@ import org.apache.hadoop.fs.Path;
public class EasyGroupScan extends AbstractFileGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
- private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
+ private FileSelection selection;
+ private int partitionDepth;
private int maxWidth;
+ private int minWidth = 1;
private List<SchemaPath> columns;
private ListMultimap<Integer, CompleteFileWork> mappings;
@@ -104,9 +107,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
initFromSelection(selection, formatPlugin);
}
- @JsonIgnore
- public Iterable<CompleteFileWork> getWorkIterable() {
- return () -> Iterators.unmodifiableIterator(chunks.iterator());
+ public EasyGroupScan(
+ String userName,
+ FileSelection selection,
+ EasyFormatPlugin<?> formatPlugin,
+ List<SchemaPath> columns,
+ Path selectionRoot,
+ int minWidth
+ ) throws IOException{
+ this(userName, selection, formatPlugin, columns, selectionRoot);
+
+ // Set the minimum width of this reader. Primarily used for testing
+ // to force parallelism even for small test files.
+ // See ExecConstants.MIN_READER_WIDTH
+ this.minWidth = Math.max(1, Math.min(minWidth, maxWidth));
+
+ // Compute the maximum partition depth across all files.
+ partitionDepth = ColumnExplorer.getPartitionDepth(selection);
}
private EasyGroupScan(final EasyGroupScan that) {
@@ -118,17 +135,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
chunks = that.chunks;
endpointAffinities = that.endpointAffinities;
maxWidth = that.maxWidth;
+ minWidth = that.minWidth;
mappings = that.mappings;
+ partitionDepth = that.partitionDepth;
+ }
+
+ @JsonIgnore
+ public Iterable<CompleteFileWork> getWorkIterable() {
+ return () -> Iterators.unmodifiableIterator(chunks.iterator());
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
- @SuppressWarnings("resource")
final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.selection = selection;
BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
- this.maxWidth = chunks.size();
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
+ maxWidth = chunks.size();
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
public Path getSelectionRoot() {
@@ -136,11 +159,16 @@ public class EasyGroupScan extends AbstractFileGroupScan {
}
@Override
+ @JsonIgnore
+ public int getMinParallelizationWidth() {
+ return minWidth;
+ }
+
+ @Override
public int getMaxParallelizationWidth() {
return maxWidth;
}
-
@Override
public ScanStats getScanStats(final PlannerSettings settings) {
return formatPlugin.getScanStats(settings, this);
@@ -163,7 +191,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return columns;
}
-
@JsonIgnore
public FileSelection getFileSelection() {
return selection;
@@ -180,7 +207,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return new EasyGroupScan(this);
}
-
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
@@ -217,7 +243,8 @@ public class EasyGroupScan extends AbstractFileGroupScan {
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+ EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
+ columns, selectionRoot, partitionDepth);
subScan.setOperatorId(this.getOperatorId());
return subScan;
}
@@ -275,5 +302,4 @@ public class EasyGroupScan extends AbstractFileGroupScan {
public boolean canPushdownProjects(List<SchemaPath> columns) {
return formatPlugin.supportsPushDown();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index fbb3f475c..c51c7ac0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -42,7 +42,8 @@ public class EasySubScan extends AbstractSubScan{
private final List<FileWorkImpl> files;
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
- private Path selectionRoot;
+ private final Path selectionRoot;
+ private final int partitionDepth;
@JsonCreator
public EasySubScan(
@@ -52,7 +53,8 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("format") FormatPluginConfig formatConfig,
@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("selectionRoot") Path selectionRoot
+ @JsonProperty("selectionRoot") Path selectionRoot,
+ @JsonProperty("partitionDepth") int partitionDepth
) throws ExecutionSetupException {
super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -60,50 +62,40 @@ public class EasySubScan extends AbstractSubScan{
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
- public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
- Path selectionRoot){
+ public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
+ List<SchemaPath> columns, Path selectionRoot, int partitionDepth) {
super(userName);
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
@JsonProperty
- public Path getSelectionRoot() {
- return selectionRoot;
- }
+ public Path getSelectionRoot() { return selectionRoot; }
+
+ @JsonProperty
+ public int getPartitionDepth() { return partitionDepth; }
@JsonIgnore
- public EasyFormatPlugin<?> getFormatPlugin(){
- return formatPlugin;
- }
+ public EasyFormatPlugin<?> getFormatPlugin() { return formatPlugin; }
@JsonProperty("files")
- public List<FileWorkImpl> getWorkUnits() {
- return files;
- }
+ public List<FileWorkImpl> getWorkUnits() { return files; }
@JsonProperty("storage")
- public StoragePluginConfig getStorageConfig(){
- return formatPlugin.getStorageConfig();
- }
+ public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); }
@JsonProperty("format")
- public FormatPluginConfig getFormatConfig(){
- return formatPlugin.getConfig();
- }
+ public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); }
@JsonProperty("columns")
- public List<SchemaPath> getColumns(){
- return columns;
- }
+ public List<SchemaPath> getColumns() { return columns; }
@Override
- public int getOperatorType() {
- return formatPlugin.getReaderOperatorType();
- }
-
+ public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 03ae6f554..05ed1b564 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -18,34 +18,45 @@
package org.apache.drill.exec.store.easy.text;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.FileReaderCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
+import org.apache.drill.exec.store.easy.text.compliant.v3.CompliantTextBatchReader;
+import org.apache.drill.exec.store.easy.text.compliant.v3.TextParsingSettingsV3;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
import org.apache.drill.exec.store.text.DrillTextRecordWriter;
@@ -61,91 +72,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
- private final static String DEFAULT_NAME = "text";
+public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig>
+ implements FileReaderCreator {
+ private final static String PLUGIN_NAME = "text";
- public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
- super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true,
- Collections.emptyList(), DEFAULT_NAME);
- }
-
- public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
- TextFormatConfig formatPluginConfig) {
- super(name, context, fsConf, config, formatPluginConfig, true, false, true, true,
- formatPluginConfig.getExtensions(), DEFAULT_NAME);
- }
-
-
- @Override
- public RecordReader getRecordReader(FragmentContext context,
- DrillFileSystem dfs,
- FileWork fileWork,
- List<SchemaPath> columns,
- String userName) {
- Path path = dfs.makeQualified(fileWork.getPath());
- FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
-
- if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
- TextParsingSettings settings = new TextParsingSettings();
- settings.set(formatConfig);
- return new CompliantTextRecordReader(split, dfs, settings, columns);
- } else {
- char delim = formatConfig.getFieldDelimiter();
- return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
- }
- }
-
- @Override
- public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
- throws IOException {
- return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
- }
-
- @Override
- public boolean supportsStatistics() {
- return false;
- }
-
- @Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
- long data = 0;
- for (final CompleteFileWork work : scan.getWorkIterable()) {
- data += work.getTotalBytes();
- }
- final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
- final double estRowCount = data / estimatedRowSize;
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
- }
-
- @Override
- public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
- final Map<String, String> options = new HashMap<>();
-
- options.put("location", writer.getLocation());
- FragmentHandle handle = context.getHandle();
- String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
- options.put("prefix", fragmentId);
- options.put("separator", getConfig().getFieldDelimiterAsString());
- options.put("extension", getConfig().getExtensions().get(0));
-
- RecordWriter recordWriter = new DrillTextRecordWriter(
- context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
- recordWriter.init(options);
-
- return recordWriter;
- }
-
- @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT)
+ @JsonTypeName(PLUGIN_NAME)
+ @JsonInclude(Include.NON_DEFAULT)
public static class TextFormatConfig implements FormatPluginConfig {
public List<String> extensions = ImmutableList.of();
@@ -157,34 +89,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
public boolean skipFirstLine = false;
public boolean extractHeader = false;
- public List<String> getExtensions() {
- return extensions;
- }
-
- public char getQuote() {
- return quote;
- }
-
- public char getEscape() {
- return escape;
- }
+ public TextFormatConfig() { }
- public char getComment() {
- return comment;
- }
-
- public String getLineDelimiter() {
- return lineDelimiter;
- }
-
- public char getFieldDelimiter() {
- return fieldDelimiter;
- }
+ public List<String> getExtensions() { return extensions; }
+ public char getQuote() { return quote; }
+ public char getEscape() { return escape; }
+ public char getComment() { return comment; }
+ public String getLineDelimiter() { return lineDelimiter; }
+ public char getFieldDelimiter() { return fieldDelimiter; }
+ public boolean isSkipFirstLine() { return skipFirstLine; }
@JsonIgnore
- public boolean isHeaderExtractionEnabled() {
- return extractHeader;
- }
+ public boolean isHeaderExtractionEnabled() { return extractHeader; }
@JsonIgnore
public String getFieldDelimiterAsString(){
@@ -197,10 +113,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
this.fieldDelimiter = delimiter;
}
- public boolean isSkipFirstLine() {
- return skipFirstLine;
- }
-
@Override
public int hashCode() {
final int prime = 31;
@@ -262,24 +174,173 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
}
return true;
}
+ }
+
+ public static class TextScanBatchCreator extends ScanFrameworkCreator {
+
+ private final FileReaderCreator readerCreator;
+ private final TextFormatPlugin textPlugin;
+
+ public TextScanBatchCreator(TextFormatPlugin plugin,
+ FileReaderCreator readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ textPlugin = plugin;
+ }
+
+ @Override
+ protected ColumnsScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
+ ColumnsScanFramework framework = new ColumnsScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+
+ // If this format has no headers, or wants to skip them,
+ // then we must use the columns column to hold the data.
+
+ framework.requireColumnsArray(
+ ! textPlugin.getConfig().isHeaderExtractionEnabled());
+
+ // Text files handle nulls in an unusual way. Missing columns
+ // are set to required Varchar and filled with blanks. Yes, this
+ // means that the SQL statement or code cannot differentiate missing
+ // columns from empty columns, but that is how CSV and other text
+ // files have been defined within Drill.
+
+ framework.setNullType(
+ MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.REQUIRED)
+ .build());
+
+ return framework;
+ }
+ }
+
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
+ this(name, context, fsConf, storageConfig, new TextFormatConfig());
+ }
+
+ public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config,
+ TextFormatConfig formatPluginConfig) {
+ super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig);
+ }
+
+ private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = true;
+ config.blockSplittable = true;
+ config.compressible = true;
+ config.supportsProjectPushdown = true;
+ config.extensions = pluginConfig.getExtensions();
+ config.fsConf = fsConf;
+ config.defaultName = PLUGIN_NAME;
+ config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+ config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE;
+ return config;
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
+ throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns, OptionManager options) throws IOException {
+ return new EasyGroupScan(userName, selection, this, columns,
+ selection.selectionRoot,
+ // Some paths provide a null option manager. In that case, default to a
+ // min width of 1; just like the base class.
+ options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY));
+ }
+
+ @Override
+ protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+ // Create the "legacy", "V2" reader or the new "V3" version based on
+ // the result set loader. This code should be temporary: the two
+ // readers provide identical functionality for the user; only the
+ // internals differ.
+ if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) {
+ return new TextScanBatchCreator(this, this);
+ } else {
+ return new ClassicScanBatchCreator(this);
+ }
+ }
+
+ // TODO: Remove this once the V2 reader is removed.
+ @Override
+ public RecordReader getRecordReader(FragmentContext context,
+ DrillFileSystem dfs,
+ FileWork fileWork,
+ List<SchemaPath> columns,
+ String userName) {
+ Path path = dfs.makeQualified(fileWork.getPath());
+ FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
+
+ if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
+ TextParsingSettings settings = new TextParsingSettings();
+ settings.set(formatConfig);
+ return new CompliantTextRecordReader(split, dfs, settings, columns);
+ } else {
+ char delim = formatConfig.getFieldDelimiter();
+ return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
+ }
+ }
+ @Override
+ public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException {
+ final Map<String, String> options = new HashMap<>();
+ options.put("location", writer.getLocation());
+ FragmentHandle handle = context.getHandle();
+ String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ options.put("prefix", fragmentId);
+ options.put("separator", getConfig().getFieldDelimiterAsString());
+ options.put("extension", getConfig().getExtensions().get(0));
+ RecordWriter recordWriter = new DrillTextRecordWriter(
+ context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf());
+ recordWriter.init(options);
+
+ return recordWriter;
}
@Override
- public int getReaderOperatorType() {
- return CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+ public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader(
+ DrillFileSystem dfs,
+ FileSplit split) throws ExecutionSetupException {
+ TextParsingSettingsV3 settings = new TextParsingSettingsV3();
+ settings.set(getConfig());
+ return new CompliantTextBatchReader(split, dfs, settings);
+ }
+ @Override
+ public boolean supportsStatistics() {
+ return false;
}
@Override
- public int getWriterOperatorType() {
- return CoreOperatorType.TEXT_WRITER_VALUE;
+ public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
}
@Override
- public boolean supportsPushDown() {
- return true;
+ public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
}
+ @Override
+ protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) {
+ long data = 0;
+ for (final CompleteFileWork work : scan.getWorkIterable()) {
+ data += work.getTotalBytes();
+ }
+ final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
+ final double estRowCount = data / estimatedRowSize;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
new file mode 100644
index 000000000..0ed3155a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Original version of the "compliant" text reader. This is version 2 of
+ * the text reader. This version is retained for temporary backward
+ * compatibility as we productize the newer version 3 based on the
+ * row set framework.
+ * <p>
+ * TODO: Remove the files in this package and move the files from the
+ * "v3" sub-package here once the version 3 implementation stabilizes.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
new file mode 100644
index 000000000..6bf0bb69c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+
+public abstract class BaseFieldOutput extends TextOutput {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+ private static final int MAX_FIELD_LENGTH = 1024 * 64;
+
+ // track which field is getting appended
+ protected int currentFieldIndex = -1;
+ // track chars within field
+ protected int currentDataPointer;
+ // track if field is still getting appended
+ private boolean fieldOpen = true;
+ // holds chars for a field
+ protected byte[] fieldBytes;
+ protected final RowSetLoader writer;
+ private final boolean[] projectionMask;
+ protected final int maxField;
+ protected boolean fieldProjected;
+
+ /**
+ * Initialize the field output for one of three scenarios:
+ * <ul>
+ * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1
+ * max fields.</li>
+ * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field
+ * of -1.</li>
+ * <li>SELECT a, b, c indicated by a non-null projection mask that
+ * identifies the indexes of the fields to be selected. In this case,
+ * this constructor computes the maximum field.</li>
+ * </ul>
+ *
+ * @param writer Row set writer that provides access to the writer for
+ * each column
+ * @param maxField the index of the last field to store. May be -1 if no
+ * fields are to be stored. Computed if the projection mask is set
+ * @param projectionMask a boolean array indicating which fields are
+ * to be projected to the output. Optional
+ */
+
+ public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) {
+ this.writer = writer;
+ this.projectionMask = projectionMask;
+
+ // If no projection mask is defined, then we want all columns
+ // up to the max field, which may be -1 if we want to select
+ // nothing.
+
+ if (projectionMask == null) {
+ this.maxField = maxField;
+ } else {
+
+ // Otherwise, use the projection mask to determine
+ // which fields are to be projected. (The file may well
+ // contain more than the projected set.)
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ this.maxField = end;
+ }
+
+ // If we project at least one field, allocate a buffer.
+
+ if (maxField >= 0) {
+ fieldBytes = new byte[MAX_FIELD_LENGTH];
+ }
+ }
+
+ /**
+ * Start a new record record. Resets all pointers
+ */
+
+ @Override
+ public void startRecord() {
+ currentFieldIndex = -1;
+ fieldOpen = false;
+ writer.start();
+ }
+
+ @Override
+ public void startField(int index) {
+ assert index == currentFieldIndex + 1;
+ currentFieldIndex = index;
+ currentDataPointer = 0;
+ fieldOpen = true;
+
+ // Figure out if this field is projected.
+
+ if (projectionMask == null) {
+ fieldProjected = currentFieldIndex <= maxField;
+ } else if (currentFieldIndex >= projectionMask.length) {
+ fieldProjected = false;
+ } else {
+ fieldProjected = projectionMask[currentFieldIndex];
+ }
+ }
+
+ @Override
+ public void append(byte data) {
+ if (! fieldProjected) {
+ return;
+ }
+ if (currentDataPointer >= MAX_FIELD_LENGTH - 1) {
+ throw UserException
+ .unsupportedError()
+ .message("Text column is too large.")
+ .addContext("Column", currentFieldIndex)
+ .addContext("Limit", MAX_FIELD_LENGTH)
+ .build(logger);
+ }
+
+ fieldBytes[currentDataPointer++] = data;
+ }
+
+ @Override
+ public boolean endField() {
+ fieldOpen = false;
+ return currentFieldIndex < maxField;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+ return endField();
+ }
+
+ @Override
+ public void finishRecord() {
+ if (fieldOpen) {
+ endField();
+ }
+ writer.save();
+ }
+
+ @Override
+ public long getRecordCount() {
+ return writer.rowCount();
+ }
+
+ @Override
+ public boolean isFull() {
+ return writer.isFull();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
new file mode 100644
index 000000000..e489003c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * New text reader, complies with the RFC 4180 standard for text/csv files
+ */
+public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class);
+
+ private static final int MAX_RECORDS_PER_BATCH = 8096;
+ private static final int READ_BUFFER = 1024 * 1024;
+ private static final int WHITE_SPACE_BUFFER = 64 * 1024;
+
+ // settings to be used while parsing
+ private final TextParsingSettingsV3 settings;
+ // Chunk of the file to be read by this reader
+ private final FileSplit split;
+ // text reader implementation
+ private TextReader reader;
+ // input buffer
+ private DrillBuf readBuffer;
+ // working buffer to handle whitespaces
+ private DrillBuf whitespaceBuffer;
+ private final DrillFileSystem dfs;
+
+ private RowSetLoader writer;
+
+ public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) {
+ this.split = split;
+ this.settings = settings;
+ this.dfs = dfs;
+
+ // Validate. Otherwise, these problems show up later as a data
+ // read error which is very confusing.
+
+ if (settings.getNewLineDelimiter().length == 0) {
+ throw UserException
+ .validationError()
+ .message("The text format line delimiter cannot be blank.")
+ .build(logger);
+ }
+ }
+
+ /**
+ * Performs the initial setup required for the record reader.
+ * Initializes the input stream, handling of the output record batch
+ * and the actual reader to be used.
+ * @param context operator context from which buffer's will be allocated and managed
+ * @param outputMutator Used to create the schema in the output record batch
+ */
+
+ @Override
+ public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
+ final OperatorContext context = schemaNegotiator.context();
+
+ // Note: DO NOT use managed buffers here. They remain in existence
+ // until the fragment is shut down. The buffers here are large.
+ // If we scan 1000 files, and allocate 1 MB for each, we end up
+ // holding onto 1 GB of memory in managed buffers.
+ // Instead, we allocate the buffers explicitly, and must free
+ // them.
+
+ readBuffer = context.getAllocator().buffer(READ_BUFFER);
+ whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
+
+ // TODO: Set this based on size of record rather than
+ // absolute count.
+
+ schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH);
+
+ // setup Output, Input, and Reader
+ try {
+ TextOutput output;
+
+ if (settings.isHeaderExtractionEnabled()) {
+ output = openWithHeaders(schemaNegotiator);
+ } else {
+ output = openWithoutHeaders(schemaNegotiator);
+ }
+ if (output == null) {
+ return false;
+ }
+ openReader(output);
+ return true;
+ } catch (final IOException e) {
+ throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
+ }
+ }
+
+ /**
+ * Extract header and use that to define the reader schema.
+ */
+
+ private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
+ final String [] fieldNames = extractHeader();
+ if (fieldNames == null) {
+ return null;
+ }
+ final TupleMetadata schema = new TupleSchema();
+ for (final String colName : fieldNames) {
+ schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED));
+ }
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new FieldVarCharOutput(writer);
+ }
+
+ /**
+ * When no headers, create a single array column "columns".
+ */
+
+ private TextOutput openWithoutHeaders(
+ ColumnsSchemaNegotiator schemaNegotiator) {
+ final TupleMetadata schema = new TupleSchema();
+ schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED));
+ schemaNegotiator.setTableSchema(schema, true);
+ writer = schemaNegotiator.build().writer();
+ return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes());
+ }
+
+ private void openReader(TextOutput output) throws IOException {
+ logger.trace("Opening file {}", split.getPath());
+ final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput input = new TextInput(settings, stream, readBuffer,
+ split.getStart(), split.getStart() + split.getLength());
+
+ // setup Reader using Input and Output
+ reader = new TextReader(settings, input, output, whitespaceBuffer);
+ reader.start();
+ }
+
+ /**
+ * Extracts header from text file.
+ * Currently it is assumed to be first line if headerExtractionEnabled is set to true
+ * TODO: enhance to support more common header patterns
+ * @return field name strings
+ */
+
+ private String [] extractHeader() throws IOException {
+ assert settings.isHeaderExtractionEnabled();
+
+ // don't skip header in case skipFirstLine is set true
+ settings.setSkipFirstLine(false);
+
+ final HeaderBuilder hOutput = new HeaderBuilder(split.getPath());
+
+ // setup Input using InputStream
+ // we should read file header irrespective of split given given to this reader
+ final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
+ final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength());
+
+ // setup Reader using Input and Output
+ this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer);
+ reader.start();
+
+ // extract first row only
+ reader.parseNext();
+
+ // grab the field names from output
+ final String [] fieldNames = hOutput.getHeaders();
+
+ // cleanup and set to skip the first line next time we read input
+ reader.close();
+ settings.setSkipFirstLine(true);
+
+ readBuffer.clear();
+ whitespaceBuffer.clear();
+ return fieldNames;
+ }
+
+ /**
+ * Generates the next record batch
+ * @return number of records in the batch
+ */
+
+ @Override
+ public boolean next() {
+ reader.resetForNextBatch();
+
+ try {
+ boolean more = false;
+ while (! writer.isFull()) {
+ more = reader.parseNext();
+ if (! more) {
+ break;
+ }
+ }
+ reader.finishBatch();
+
+ // Return false on the batch that hits EOF. The scan operator
+ // knows to process any rows in this final batch.
+
+ return more && writer.rowCount() > 0;
+ } catch (IOException | TextParsingException e) {
+ if (e.getCause() != null && e.getCause() instanceof UserException) {
+ throw (UserException) e.getCause();
+ }
+ throw UserException.dataReadError(e)
+ .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
+ split.getPath(), reader.getPos())
+ .build(logger);
+ }
+ }
+
+ /**
+ * Cleanup state once we are finished processing all the records.
+ * This would internally close the input stream we are reading from.
+ */
+ @Override
+ public void close() {
+
+ // Release the buffers allocated above. Double-check to handle
+ // unexpected multiple calls to close().
+
+ if (readBuffer != null) {
+ readBuffer.release();
+ readBuffer = null;
+ }
+ if (whitespaceBuffer != null) {
+ whitespaceBuffer.release();
+ whitespaceBuffer = null;
+ }
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (final IOException e) {
+ logger.warn("Exception while closing stream.", e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
new file mode 100644
index 000000000..df48a5548
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a set of varchar vectors. A varchar vector contains all the field
+ * values for a given column. Each record is a single value within each vector of the set.
+ */
+class FieldVarCharOutput extends BaseFieldOutput {
+
+ /**
+ * We initialize and add the varchar vector for each incoming field in this
+ * constructor.
+ * @param outputMutator Used to create/modify schema
+ * @param fieldNames Incoming field names
+ * @param columns List of columns selected in the query
+ * @param isStarQuery boolean to indicate if all fields are selected or not
+ */
+ public FieldVarCharOutput(RowSetLoader writer) {
+ super(writer,
+ TextReader.MAXIMUM_NUMBER_COLUMNS,
+ makeMask(writer));
+ }
+
+ private static boolean[] makeMask(RowSetLoader writer) {
+ final TupleMetadata schema = writer.tupleSchema();
+ final boolean projectionMask[] = new boolean[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ projectionMask[i] = schema.metadata(i).isProjected();
+ }
+ return projectionMask;
+ }
+
+ @Override
+ public boolean endField() {
+ if (fieldProjected) {
+ writer.scalar(currentFieldIndex)
+ .setBytes(fieldBytes, currentDataPointer);
+ }
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
new file mode 100644
index 000000000..62eafc898
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+/**
+ * Text output that implements a header reader/parser.
+ * The caller parses out the characters of each header;
+ * this class assembles UTF-8 bytes into Unicode characters,
+ * fixes invalid characters (those not legal for SQL symbols),
+ * and maps duplicate names to unique names.
+ * <p>
+ * That is, this class is as permissive as possible with file
+ * headers to avoid spurious query failures for trivial reasons.
+ */
+
+// Note: this class uses Java heap strings and the usual Java
+// convenience classes. Since we do heavy Unicode string operations,
+// and read a single row, there is no good reason to try to use
+// value vectors and direct memory for this task.
+
+public class HeaderBuilder extends TextOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class);
+
+ /**
+ * Maximum Drill symbol length, as enforced for headers.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ */
+ // TODO: Replace with the proper constant, if available
+ public static final int MAX_HEADER_LEN = 1024;
+
+ /**
+ * Prefix used to replace non-alphabetic characters at the start of
+ * a column name. For example, $foo becomes col_foo. Used
+ * because SQL does not allow _foo.
+ */
+
+ public static final String COLUMN_PREFIX = "col_";
+
+ /**
+ * Prefix used to create numbered columns for missing
+ * headers. Typical names: column_1, column_2, ...
+ */
+
+ public static final String ANONYMOUS_COLUMN_PREFIX = "column_";
+
+ public final Path filePath;
+ public final List<String> headers = new ArrayList<>();
+ public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);
+
+ public HeaderBuilder(Path filePath) {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void startField(int index) {
+ currentField.clear();
+ }
+
+ @Override
+ public boolean endField() {
+ String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
+ header = validateSymbol(header);
+ headers.add(header);
+ return true;
+ }
+
+ @Override
+ public boolean endEmptyField() {
+
+ // Empty header will be rewritten to "column_<n>".
+
+ return endField();
+ }
+
+ /**
+ * Validate the header name according to the SQL lexical rules.
+ * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
+ * identifier documentation</a>
+ * @param header the header name to validate
+ */
+
+ // TODO: Replace with existing code, if any.
+ private String validateSymbol(String header) {
+ header = header.trim();
+
+ // To avoid unnecessary query failures, just make up a column name
+ // if the name is missing or all blanks.
+
+ if (header.isEmpty()) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ }
+ if (! Character.isAlphabetic(header.charAt(0))) {
+ return rewriteHeader(header);
+ }
+ for (int i = 1; i < header.length(); i++) {
+ char ch = header.charAt(i);
+ if (! Character.isAlphabetic(ch) &&
+ ! Character.isDigit(ch) && ch != '_') {
+ return rewriteHeader(header);
+ }
+ }
+ return header;
+ }
+
+ /**
+ * Given an invalid header, rewrite it to replace illegal characters
+ * with valid ones. The header won't be what the user specified,
+ * but it will be a valid SQL identifier. This solution avoids failing
+ * queries due to corrupted or invalid header data.
+ * <p>
+ * Names with invalid first characters are mapped to "col_". Example:
+ * $foo maps to col_foo. If the only character is non-alphabetic, treat
+ * the column as anonymous and create a generic name: column_4, etc.
+ * <p>
+ * This mapping could create a column that exceeds the maximum length
+ * of 1024. Since that is not really a hard limit, we just live with the
+ * extra few characters.
+ *
+ * @param header the original header
+ * @return the rewritten header, valid for SQL
+ */
+
+ private String rewriteHeader(String header) {
+ final StringBuilder buf = new StringBuilder();
+
+ // If starts with non-alphabetic, can't map the character to
+ // underscore, so just tack on a prefix.
+
+ char ch = header.charAt(0);
+ if (Character.isAlphabetic(ch)) {
+ buf.append(ch);
+ } else if (Character.isDigit(ch)) {
+ buf.append(COLUMN_PREFIX);
+ buf.append(ch);
+
+ // For the strange case of only one character, format
+ // the same as an empty header.
+
+ } else if (header.length() == 1) {
+ return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
+ } else {
+ buf.append(COLUMN_PREFIX);
+ }
+
+ // Convert all remaining invalid characters to underscores
+
+ for (int i = 1; i < header.length(); i++) {
+ ch = header.charAt(i);
+ if (Character.isAlphabetic(ch) ||
+ Character.isDigit(ch) || ch == '_') {
+ buf.append(ch);
+ } else {
+ buf.append("_");
+ }
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void append(byte data) {
+
+ // Ensure the data fits. Note that, if the name is Unicode, the actual
+ // number of characters might be less than the limit even though the
+ // byte count exceeds the limit. Fixing this, in general, would require
+ // a buffer four times larger, so we leave that as a later improvement
+ // if ever needed.
+
+ try {
+ currentField.put(data);
+ } catch (BufferOverflowException e) {
+ throw UserException.dataReadError()
+ .message("Column exceeds maximum length of %d", MAX_HEADER_LEN)
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void finishRecord() {
+ if (headers.isEmpty()) {
+ throw UserException.dataReadError()
+ .message("The file must define at least one header.")
+ .addContext("File Path", filePath.toString())
+ .build(logger);
+ }
+
+ // Force headers to be unique.
+
+ final Set<String> idents = new HashSet<String>();
+ for (int i = 0; i < headers.size(); i++) {
+ String header = headers.get(i);
+ String key = header.toLowerCase();
+
+ // Is the header a duplicate?
+
+ if (idents.contains(key)) {
+
+ // Make header unique by appending a suffix.
+ // This loop must end because we have a finite
+ // number of headers.
+ // The original column is assumed to be "1", so
+ // the first duplicate is "2", and so on.
+ // Note that this will map columns of the form:
+ // "col,col,col_2,col_2_2" to
+ // "col", "col_2", "col_2_2", "col_2_2_2".
+ // No mapping scheme is perfect...
+
+ for (int l = 2; ; l++) {
+ final String rewritten = header + "_" + l;
+ key = rewritten.toLowerCase();
+ if (! idents.contains(key)) {
+ headers.set(i, rewritten);
+ break;
+ }
+ }
+ }
+ idents.add(key);
+ }
+ }
+
+ @Override
+ public void startRecord() { }
+
+ public String[] getHeaders() {
+
+ // Just return the headers: any needed checks were done in
+ // finishRecord()
+
+ final String array[] = new String[headers.size()];
+ return headers.toArray(array);
+ }
+
+ // Not used.
+ @Override
+ public long getRecordCount() { return 0; }
+
+ @Override
+ public boolean isFull() { return false; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
new file mode 100644
index 000000000..13b44509e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a single vector of type repeated varchar vector. Each record is a single
+ * value within the vector containing all the fields in the record as individual array elements.
+ */
+public class RepeatedVarCharOutput extends BaseFieldOutput {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
+
+ private final ScalarWriter columnWriter;
+ private final ArrayWriter arrayWriter;
+
+ /**
+ * Provide the row set loader (which must have just one repeated Varchar
+ * column) and an optional array projection mask.
+ * @param projectionMask
+ * @param tupleLoader
+ */
+
+ public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) {
+ super(loader,
+ maxField(loader, projectionMask),
+ projectionMask);
+ arrayWriter = writer.array(0);
+ columnWriter = arrayWriter.scalar();
+ }
+
+ private static int maxField(RowSetLoader loader, boolean[] projectionMask) {
+
+ // If the one and only field (`columns`) is not selected, then this
+ // is a COUNT(*) or similar query. Select nothing.
+
+ if (! loader.tupleSchema().metadata(0).isProjected()) {
+ return -1;
+ }
+
+ // If this is SELECT * or SELECT `columns` query, project all
+ // possible fields.
+
+ if (projectionMask == null) {
+ return TextReader.MAXIMUM_NUMBER_COLUMNS;
+ }
+
+ // Else, this is a SELECT columns[x], columns[y], ... query.
+ // Project only the requested element members (fields).
+
+ int end = projectionMask.length - 1;
+ while (end >= 0 && ! projectionMask[end]) {
+ end--;
+ }
+ return end;
+ }
+
+ /**
+ * Write the value into an array position. Rules:
+ * <ul>
+ * <li>If there is no projection mask, collect all columns.</li>
+ * <li>If a selection mask is present, we previously found the index
+ * of the last projection column (<tt>maxField</tt>). If the current
+ * column is beyond that number, ignore the data and stop accepting
+ * columns.</li>
+ * <li>If the column is projected, add the data to the array.</li>
+ * <li>If the column is not projected, add a blank value to the
+ * array.</li>
+ * </ul>
+ * The above ensures that we leave no holes in the portion of the
+ * array that is projected (by adding blank columns where needed),
+ * and we just ignore columns past the end of the projected part
+ * of the array. (No need to fill holes at the end.)
+ */
+
+ @Override
+ public boolean endField() {
+
+ // Skip the field if past the set of projected fields.
+
+ if (currentFieldIndex > maxField) {
+ return false;
+ }
+
+ // If the field is projected, save it.
+
+ if (fieldProjected) {
+
+ // Repeated var char will create as many entries as there are columns.
+ // If this would exceed the maximum, issue an error. Note that we do
+ // this only if all fields are selected; the same query will succeed if
+ // the user does a COUNT(*) or SELECT columns[x], columns[y], ...
+
+ if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
+ throw UserException
+ .unsupportedError()
+ .message("Text file contains too many fields")
+ .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS)
+ .build(logger);
+ }
+
+ // Save the field.
+
+ columnWriter.setBytes(fieldBytes, currentDataPointer);
+ } else {
+
+ // The field is not projected.
+ // Must write a value into this array position, but
+ // the value should be empty.
+
+ columnWriter.setBytes(fieldBytes, 0);
+ }
+
+ // Return whether the rest of the fields should be read.
+
+ return super.endField();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
new file mode 100644
index 000000000..70c43b7ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+class StreamFinishedPseudoException extends RuntimeException {
+
+ public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+
+ private StreamFinishedPseudoException() {
+ super("", null, false, true);
+
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
new file mode 100644
index 000000000..26fade6d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+ private final byte[] lineSeparator;
+ private final byte normalizedLineSeparator;
+ private final TextParsingSettingsV3 settings;
+
+ private long lineCount;
+ private long charCount;
+
+ /**
+ * The starting position in the file.
+ */
+ private final long startPos;
+ private final long endPos;
+
+ private long streamPos;
+
+ private final Seekable seekable;
+ private final FSDataInputStream inputFS;
+ private final InputStream input;
+
+ private final DrillBuf buffer;
+ private final ByteBuffer underlyingBuffer;
+ private final long bStart;
+ private final long bStartMinus1;
+
+ private final boolean bufferReadable;
+
+ /**
+ * Whether there was a possible partial line separator on the previous
+ * read so we dropped it and it should be appended to next read.
+ */
+ private int remByte = -1;
+
+ /**
+ * The current position in the buffer.
+ */
+ public int bufferPtr;
+
+ /**
+ * The quantity of valid data in the buffer.
+ */
+ public int length = -1;
+
+ private boolean endFound = false;
+
+ /**
+ * Creates a new instance with the mandatory characters for handling newlines
+ * transparently. lineSeparator the sequence of characters that represent a
+ * newline, as defined in {@link Format#getLineSeparator()}
+ * normalizedLineSeparator the normalized newline character (as defined in
+ * {@link Format#getNormalizedNewline()}) that is used to replace any
+ * lineSeparator sequence found in the input.
+ */
+ public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+ this.lineSeparator = settings.getNewLineDelimiter();
+ byte normalizedLineSeparator = settings.getNormalizedNewLine();
+ Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+ boolean isCompressed = input instanceof CompressionInputStream;
+ Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+ // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely.
+ if (isCompressed && endPos > 0) {
+ endPos = Long.MAX_VALUE;
+ }
+
+ this.input = input;
+ this.seekable = (Seekable) input;
+ this.settings = settings;
+
+ if (input instanceof FSDataInputStream) {
+ this.inputFS = (FSDataInputStream) input;
+ this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+ } else {
+ this.inputFS = null;
+ this.bufferReadable = false;
+ }
+
+ this.startPos = startPos;
+ this.endPos = endPos;
+
+ this.normalizedLineSeparator = normalizedLineSeparator;
+
+ this.buffer = readBuffer;
+ this.bStart = buffer.memoryAddress();
+ this.bStartMinus1 = bStart -1;
+ this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+ }
+
+ /**
+ * Test the input to position for read start. If the input is a non-zero split or
+ * splitFirstLine is enabled, input will move to appropriate complete line.
+ * @throws IOException for input file read errors
+ */
+ final void start() throws IOException {
+ lineCount = 0;
+ if(startPos > 0){
+ seekable.seek(startPos);
+ }
+
+ updateBuffer();
+ if (length > 0) {
+ if (startPos > 0 || settings.isSkipFirstLine()) {
+
+ // move to next full record.
+ skipLines(1);
+ }
+ }
+ }
+
+
+ /**
+ * Helper method to get the most recent characters consumed since the last record started.
+ * May get an incomplete string since we don't support stream rewind. Returns empty string for now.
+ *
+ * @return String of last few bytes.
+ * @throws IOException for input file read errors
+ */
+ public String getStringSinceMarkForError() throws IOException {
+ return " ";
+ }
+
+ long getPos() {
+ return streamPos + bufferPtr;
+ }
+
+ public void mark() { }
+
+ /**
+ * Read some more bytes from the stream. Uses the zero copy interface if available.
+ * Otherwise, does byte copy.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void read() throws IOException {
+ if (bufferReadable) {
+
+ if (remByte != -1) {
+ for (int i = 0; i <= remByte; i++) {
+ underlyingBuffer.put(lineSeparator[i]);
+ }
+ remByte = -1;
+ }
+ length = inputFS.read(underlyingBuffer);
+
+ } else {
+ byte[] b = new byte[underlyingBuffer.capacity()];
+ if (remByte != -1){
+ int remBytesNum = remByte + 1;
+ System.arraycopy(lineSeparator, 0, b, 0, remBytesNum);
+ length = input.read(b, remBytesNum, b.length - remBytesNum);
+ remByte = -1;
+ } else {
+ length = input.read(b);
+ }
+ underlyingBuffer.put(b);
+ }
+ }
+
+
+ /**
+ * Read more data into the buffer. Will also manage split end conditions.
+ *
+ * @throws IOException for input file read errors
+ */
+ private void updateBuffer() throws IOException {
+ streamPos = seekable.getPos();
+ underlyingBuffer.clear();
+
+ if (endFound) {
+ length = -1;
+ return;
+ }
+
+ read();
+
+ // check our data read allowance.
+ if (streamPos + length >= this.endPos) {
+ updateLengthBasedOnConstraint();
+ }
+
+ charCount += bufferPtr;
+ bufferPtr = 1;
+
+ buffer.writerIndex(underlyingBuffer.limit());
+ buffer.readerIndex(underlyingBuffer.position());
+ }
+
+ /**
+ * Checks to see if we can go over the end of our bytes constraint on the data. If so,
+ * adjusts so that we can only read to the last character of the first line that crosses
+ * the split boundary.
+ */
+ private void updateLengthBasedOnConstraint() {
+ final long max = bStart + length;
+ for (long m = bStart + (endPos - streamPos); m < max; m++) {
+ for (int i = 0; i < lineSeparator.length; i++) {
+ long mPlus = m + i;
+ if (mPlus < max) {
+ // we found a line separator and don't need to consult the next byte.
+ if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) {
+ length = (int) (mPlus - bStart) + 1;
+ endFound = true;
+ return;
+ }
+ } else {
+ // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read.
+ remByte = i;
+ length = length - i;
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Get next byte from stream. Also maintains the current line count. Will throw a
+ * {@link StreamFinishedPseudoException} when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextChar() throws IOException {
+ byte byteChar = nextCharNoNewLineCheck();
+ int bufferPtrTemp = bufferPtr - 1;
+ if (byteChar == lineSeparator[0]) {
+ for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) {
+ if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) {
+ return byteChar;
+ }
+ }
+
+ lineCount++;
+ byteChar = normalizedLineSeparator;
+
+ // we don't need to update buffer position if line separator is one byte long
+ if (lineSeparator.length > 1) {
+ bufferPtr += (lineSeparator.length - 1);
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+ }
+ }
+
+ return byteChar;
+ }
+
+ /**
+ * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException
+ * when the stream has run out of bytes.
+ *
+ * @return next byte from stream.
+ * @throws IOException for input file read errors
+ */
+ public final byte nextCharNoNewLineCheck() throws IOException {
+
+ if (length == -1) {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+
+ rangeCheck(buffer, bufferPtr - 1, bufferPtr);
+
+ byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+ if (bufferPtr >= length) {
+ if (length != -1) {
+ updateBuffer();
+ bufferPtr--;
+ } else {
+ throw StreamFinishedPseudoException.INSTANCE;
+ }
+ }
+
+ bufferPtr++;
+ return byteChar;
+ }
+
+ /**
+ * Number of lines read since the start of this split.
+ * @return current line count
+ */
+ public final long lineCount() {
+ return lineCount;
+ }
+
+ /**
+ * Skip forward the number of line delimiters. If you are in the middle of a line,
+ * a value of 1 will skip to the start of the next record.
+ *
+ * @param lines Number of lines to skip.
+ * @throws IOException for input file read errors
+ * @throws IllegalArgumentException if unable to skip the requested number
+ * of lines
+ */
+ public final void skipLines(int lines) throws IOException {
+ if (lines < 1) {
+ return;
+ }
+ long expectedLineCount = this.lineCount + lines;
+
+ try {
+ do {
+ nextChar();
+ } while (lineCount < expectedLineCount);
+ if (lineCount < lines) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ } catch (EOFException ex) {
+ throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+ }
+ }
+
+ public final long charCount() {
+ return charCount + bufferPtr;
+ }
+
+ public long getLineCount() {
+ return lineCount;
+ }
+
+ public void close() throws IOException{
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
new file mode 100644
index 000000000..48c184991
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+
+/**
+ * Base class for producing output record batches while dealing with
+ * text files. Defines the interface called from text parsers to create
+ * the corresponding value vectors (record batch).
+ */
+
+abstract class TextOutput {
+
+ public abstract void startRecord();
+
+ /**
+ * Start processing a new field within a record.
+ * @param index index within the record
+ */
+ public abstract void startField(int index);
+
+ /**
+ * End processing a field within a record.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endField();
+
+ /**
+ * Shortcut that lets the output know that we are closing ending a field with no data.
+ * @return true if engine should continue processing record. false if rest of record can be skipped.
+ */
+ public abstract boolean endEmptyField();
+
+ /**
+ * Add the provided data but drop any whitespace.
+ * @param data character to append
+ */
+ public void appendIgnoringWhitespace(byte data) {
+ if (TextReader.isWhite(data)) {
+ // noop
+ } else {
+ append(data);
+ }
+ }
+
+ /**
+ * Appends a byte to the output character data buffer
+ * @param data current byte read
+ */
+ public abstract void append(byte data);
+
+ /**
+ * Completes the processing of a given record. Also completes the processing of the
+ * last field being read.
+ */
+ public abstract void finishRecord();
+
+ /**
+ * Return the total number of records (across batches) processed
+ */
+ public abstract long getRecordCount();
+
+ /**
+ * Indicates if the current batch is full and reading for this batch
+ * should stop.
+ *
+ * @return true if the batch is full and the reader must exit to allow
+ * the batch to be sent downstream, false if the reader may continue to
+ * add rows to the current batch
+ */
+
+ public abstract boolean isFull();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
new file mode 100644
index 000000000..86cad4c88
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import java.io.IOException;
+
+import com.univocity.parsers.common.ParsingContext;
+
+class TextParsingContext implements ParsingContext {
+
+ private final TextInput input;
+ private final TextOutput output;
+ protected boolean stopped;
+
+ private int[] extractedIndexes;
+
+ public TextParsingContext(TextInput input, TextOutput output) {
+ this.input = input;
+ this.output = output;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop() {
+ stopped = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentLine() {
+ return input.lineCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentChar() {
+ return input.charCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int currentColumn() {
+ return -1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String[] headers() {
+ return new String[]{};
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int[] extractedFieldIndexes() {
+ return extractedIndexes;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long currentRecord() {
+ return output.getRecordCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String currentParsedContent() {
+ try {
+ return input.getStringSinceMarkForError();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void skipLines(int lines) {
+ }
+
+ @Override
+ public boolean columnsReordered() {
+ return false;
+ }
+
+ public boolean isFull() {
+ return output.isFull();
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
new file mode 100644
index 000000000..0341b4554
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+
+// TODO: Remove the "V3" suffix once the V2 version is retired.
+public class TextParsingSettingsV3 {
+
+ public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3();
+
+ private String emptyValue = null;
+ private boolean parseUnescapedQuotes = true;
+ private byte quote = b('"');
+ private byte quoteEscape = b('"');
+ private byte delimiter = b(',');
+ private byte comment = b('#');
+
+ private long maxCharsPerColumn = Character.MAX_VALUE;
+ private byte normalizedNewLine = b('\n');
+ private byte[] newLineDelimiter = {normalizedNewLine};
+ private boolean ignoreLeadingWhitespaces;
+ private boolean ignoreTrailingWhitespaces;
+ private String lineSeparatorString = "\n";
+ private boolean skipFirstLine;
+
+ private boolean headerExtractionEnabled;
+ private boolean useRepeatedVarChar = true;
+
+ public void set(TextFormatConfig config){
+ this.quote = bSafe(config.getQuote(), "quote");
+ this.quoteEscape = bSafe(config.getEscape(), "escape");
+ this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+ this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+ this.comment = bSafe(config.getComment(), "comment");
+ this.skipFirstLine = config.isSkipFirstLine();
+ this.headerExtractionEnabled = config.isHeaderExtractionEnabled();
+ if (this.headerExtractionEnabled) {
+ // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar
+ this.useRepeatedVarChar = false;
+ }
+ }
+
+ public byte getComment() {
+ return comment;
+ }
+
+ public boolean isSkipFirstLine() {
+ return skipFirstLine;
+ }
+
+ public void setSkipFirstLine(boolean skipFirstLine) {
+ this.skipFirstLine = skipFirstLine;
+ }
+
+ public boolean isUseRepeatedVarChar() {
+ return useRepeatedVarChar;
+ }
+
+ public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+ this.useRepeatedVarChar = useRepeatedVarChar;
+ }
+
+ private static byte bSafe(char c, String name) {
+ if (c > Byte.MAX_VALUE) {
+ throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a "
+ + "character between 0 and 127 but value was actually %d.", name, (int) c));
+ }
+ return (byte) c;
+ }
+
+ private static byte b(char c) {
+ return (byte) c;
+ }
+
+ public byte[] getNewLineDelimiter() {
+ return newLineDelimiter;
+ }
+
+ /**
+ * Returns the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @return the quote character
+ */
+ public byte getQuote() {
+ return quote;
+ }
+
+ /**
+ * Defines the character used for escaping values where the field delimiter is
+ * part of the value. Defaults to '"'
+ *
+ * @param quote
+ * the quote character
+ */
+ public void setQuote(byte quote) {
+ this.quote = quote;
+ }
+
+ public String getLineSeparatorString() {
+ return lineSeparatorString;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping values
+ * where the field delimiter is part of the value
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the character used for escaping
+ * values, false otherwise
+ */
+ public boolean isQuote(byte ch) {
+ return this.quote == ch;
+ }
+
+ /**
+ * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+ * @return the quote escape character
+ */
+ public byte getQuoteEscape() {
+ return quoteEscape;
+ }
+
+ /**
+ * Defines the character used for escaping quotes inside an already quoted
+ * value. Defaults to '"'
+ *
+ * @param quoteEscape
+ * the quote escape character
+ */
+ public void setQuoteEscape(byte quoteEscape) {
+ this.quoteEscape = quoteEscape;
+ }
+
+ /**
+ * Identifies whether or not a given character is used for escaping quotes
+ * inside an already quoted value.
+ *
+ * @param ch
+ * the character to be verified
+ * @return true if the given character is the quote escape character, false
+ * otherwise
+ */
+ public boolean isQuoteEscape(byte ch) {
+ return this.quoteEscape == ch;
+ }
+
+ /**
+ * Returns the field delimiter character. Defaults to ','
+ * @return the field delimiter character
+ */
+ public byte getDelimiter() {
+ return delimiter;
+ }
+
+ /**
+ * Defines the field delimiter character. Defaults to ','
+ * @param delimiter the field delimiter character
+ */
+ public void setDelimiter(byte delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ /**
+ * Identifies whether or not a given character represents a field delimiter
+ * @param ch the character to be verified
+ * @return true if the given character is the field delimiter character, false otherwise
+ */
+ public boolean isDelimiter(byte ch) {
+ return this.delimiter == ch;
+ }
+
+ /**
+ * Returns the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @return the String representation of an empty value
+ */
+ public String getEmptyValue() {
+ return emptyValue;
+ }
+
+ /**
+ * Sets the String representation of an empty value (defaults to null)
+ *
+ * <p>
+ * When reading, if the parser does not read any character from the input, and
+ * the input is within quotes, the empty is used instead of an empty string
+ *
+ * @param emptyValue
+ * the String representation of an empty value
+ */
+ public void setEmptyValue(String emptyValue) {
+ this.emptyValue = emptyValue;
+ }
+
+ /**
+ * Indicates whether the CSV parser should accept unescaped quotes inside
+ * quoted values and parse them normally. Defaults to {@code true}.
+ *
+ * @return a flag indicating whether or not the CSV parser should accept
+ * unescaped quotes inside quoted values.
+ */
+ public boolean isParseUnescapedQuotes() {
+ return parseUnescapedQuotes;
+ }
+
+ /**
+ * Configures how to handle unescaped quotes inside quoted values. If set to
+ * {@code true}, the parser will parse the quote normally as part of the
+ * value. If set the {@code false}, a
+ * {@link com.univocity.parsers.common.TextParsingException} will be thrown.
+ * Defaults to {@code true}.
+ *
+ * @param parseUnescapedQuotes
+ * indicates whether or not the CSV parser should accept unescaped
+ * quotes inside quoted values.
+ */
+ public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+ this.parseUnescapedQuotes = parseUnescapedQuotes;
+ }
+
+ /**
+ * Indicates whether or not the first valid record parsed from the input
+ * should be considered as the row containing the names of each column
+ *
+ * @return true if the first valid record parsed from the input should be
+ * considered as the row containing the names of each column, false
+ * otherwise
+ */
+ public boolean isHeaderExtractionEnabled() {
+ return headerExtractionEnabled;
+ }
+
+ /**
+ * Defines whether or not the first valid record parsed from the input should
+ * be considered as the row containing the names of each column
+ *
+ * @param headerExtractionEnabled
+ * a flag indicating whether the first valid record parsed from the
+ * input should be considered as the row containing the names of each
+ * column
+ */
+ public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+ this.headerExtractionEnabled = headerExtractionEnabled;
+ }
+
+ public long getMaxCharsPerColumn() {
+ return maxCharsPerColumn;
+ }
+
+ public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+ this.maxCharsPerColumn = maxCharsPerColumn;
+ }
+
+ public void setComment(byte comment) {
+ this.comment = comment;
+ }
+
+ public byte getNormalizedNewLine() {
+ return normalizedNewLine;
+ }
+
+ public void setNormalizedNewLine(byte normalizedNewLine) {
+ this.normalizedNewLine = normalizedNewLine;
+ }
+
+ public boolean isIgnoreLeadingWhitespaces() {
+ return ignoreLeadingWhitespaces;
+ }
+
+ public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+ this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+ }
+
+ public boolean isIgnoreTrailingWhitespaces() {
+ return ignoreTrailingWhitespaces;
+ }
+
+ public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+ this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
new file mode 100644
index 000000000..17a076c0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+public final class TextReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+ private static final byte NULL_BYTE = (byte) '\0';
+
+ public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+ private final TextParsingContext context;
+
+ private final TextParsingSettingsV3 settings;
+
+ private final TextInput input;
+ private final TextOutput output;
+ private final DrillBuf workBuf;
+
+ private byte ch;
+
+ // index of the field within this record
+ private int fieldIndex;
+
+ /** Behavior settings **/
+ private final boolean ignoreTrailingWhitespace;
+ private final boolean ignoreLeadingWhitespace;
+ private final boolean parseUnescapedQuotes;
+
+ /** Key Characters **/
+ private final byte comment;
+ private final byte delimiter;
+ private final byte quote;
+ private final byte quoteEscape;
+ private final byte newLine;
+
+ /**
+ * The CsvParser supports all settings provided by {@link TextParsingSettingsV3},
+ * and requires this configuration to be properly initialized.
+ * @param settings the parser configuration
+ * @param input input stream
+ * @param output interface to produce output record batch
+ * @param workBuf working buffer to handle whitespace
+ */
+ public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+ this.context = new TextParsingContext(input, output);
+ this.workBuf = workBuf;
+ this.settings = settings;
+
+ this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+ this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+ this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+ this.delimiter = settings.getDelimiter();
+ this.quote = settings.getQuote();
+ this.quoteEscape = settings.getQuoteEscape();
+ this.newLine = settings.getNormalizedNewLine();
+ this.comment = settings.getComment();
+
+ this.input = input;
+ this.output = output;
+ }
+
+ public TextOutput getOutput() { return output; }
+
+ /**
+ * Check if the given byte is a white space. As per the univocity text reader
+ * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+ * we have an additional check to make sure its not negative
+ */
+ static final boolean isWhite(byte b){
+ return b <= ' ' && b > -1;
+ }
+
+ /**
+ * Inform the output interface to indicate we are starting a new record batch
+ */
+ public void resetForNextBatch() { }
+
+ public long getPos() { return input.getPos(); }
+
+ /**
+ * Function encapsulates parsing an entire record, delegates parsing of the
+ * fields to parseField() function.
+ * We mark the start of the record and if there are any failures encountered (OOM for eg)
+ * then we reset the input stream to the marked position
+ * @return true if parsing this record was successful; false otherwise
+ * @throws IOException for input file read errors
+ */
+ private boolean parseRecord() throws IOException {
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ input.mark();
+
+ fieldIndex = 0;
+ if (ignoreLeadingWhitespace && isWhite(ch)) {
+ skipWhitespace();
+ }
+
+ output.startRecord();
+ int fieldsWritten = 0;
+ try {
+ @SuppressWarnings("unused")
+ boolean earlyTerm = false;
+ while (ch != newLine) {
+ earlyTerm = ! parseField();
+ fieldsWritten++;
+ if (ch != newLine) {
+ ch = input.nextChar();
+ if (ch == newLine) {
+ output.startField(fieldsWritten++);
+ output.endEmptyField();
+ break;
+ }
+ }
+
+ // Disabling early termination. See DRILL-5914
+
+// if (earlyTerm) {
+// if (ch != newLine) {
+// input.skipLines(1);
+// }
+// break;
+// }
+ }
+ output.finishRecord();
+ } catch (StreamFinishedPseudoException e) {
+
+ // if we've written part of a field or all of a field, we should send this row.
+
+ if (fieldsWritten == 0) {
+ throw e;
+ } else {
+ output.finishRecord();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Function parses an individual field and ignores any white spaces encountered
+ * by not appending it to the output vector
+ * @throws IOException for input file read errors
+ */
+ private void parseValueIgnore() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ appendIgnoringWhitespace(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ public void appendIgnoringWhitespace(byte data) {
+ if (! isWhite(data)) {
+ output.append(data);
+ }
+ }
+
+ /**
+ * Function parses an individual field and appends all characters till the delimeter (or newline)
+ * to the output, including white spaces
+ * @throws IOException for input file read errors
+ */
+ private void parseValueAll() throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+
+ byte ch = this.ch;
+ while (ch != delimiter && ch != newLine) {
+ output.append(ch);
+ ch = input.nextChar();
+ }
+ this.ch = ch;
+ }
+
+ /**
+ * Function simply delegates the parsing of a single field to the actual
+ * implementation based on parsing config
+ *
+ * @throws IOException
+ * for input file read errors
+ */
+ private void parseValue() throws IOException {
+ if (ignoreTrailingWhitespace) {
+ parseValueIgnore();
+ } else {
+ parseValueAll();
+ }
+ }
+
+ /**
+ * Recursive function invoked when a quote is encountered. Function also
+ * handles the case when there are non-white space characters in the field
+ * after the quoted value.
+ * @param prev previous byte read
+ * @throws IOException for input file read errors
+ */
+ private void parseQuotedValue(byte prev) throws IOException {
+ final byte newLine = this.newLine;
+ final byte delimiter = this.delimiter;
+ final TextOutput output = this.output;
+ final TextInput input = this.input;
+ final byte quote = this.quote;
+
+ ch = input.nextCharNoNewLineCheck();
+
+ while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+ if (ch != quote) {
+ if (prev == quote) { // unescaped quote detected
+ if (parseUnescapedQuotes) {
+ output.append(quote);
+ output.append(ch);
+ parseQuotedValue(ch);
+ break;
+ } else {
+ throw new TextParsingException(
+ context,
+ "Unescaped quote character '"
+ + quote
+ + "' inside quoted value of CSV field. To allow unescaped quotes, "
+ + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. "
+ + "Cannot parse CSV input.");
+ }
+ }
+ output.append(ch);
+ prev = ch;
+ } else if (prev == quoteEscape) {
+ output.append(quote);
+ prev = NULL_BYTE;
+ } else {
+ prev = ch;
+ }
+ ch = input.nextCharNoNewLineCheck();
+ }
+
+ // Handles whitespace after quoted value:
+ // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ')
+ // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored
+ // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled.
+ if (ch != newLine && ch <= ' ' && ch != delimiter) {
+ final DrillBuf workBuf = this.workBuf;
+ workBuf.resetWriterIndex();
+ do {
+ // saves whitespace after value
+ workBuf.writeByte(ch);
+ ch = input.nextChar();
+ // found a new line, go to next record.
+ if (ch == newLine) {
+ return;
+ }
+ } while (ch <= ' ' && ch != delimiter);
+
+ // there's more stuff after the quoted value, not only empty spaces.
+ if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+ output.append(quote);
+ for(int i =0; i < workBuf.writerIndex(); i++){
+ output.append(workBuf.getByte(i));
+ }
+ // the next character is not the escape character, put it there
+ if (ch != quoteEscape) {
+ output.append(ch);
+ }
+ // sets this character as the previous character (may be escaping)
+ // calls recursively to keep parsing potentially quoted content
+ parseQuotedValue(ch);
+ }
+ }
+
+ if (!(ch == delimiter || ch == newLine)) {
+ throw new TextParsingException(context, "Unexpected character '" + ch
+ + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+ }
+ }
+
+ /**
+ * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+ * @return true if more rows can be read, false if not
+ * @throws IOException for input file read errors
+ */
+ private final boolean parseField() throws IOException {
+
+ output.startField(fieldIndex++);
+
+ if (isWhite(ch) && ignoreLeadingWhitespace) {
+ skipWhitespace();
+ }
+
+ // Delimiter? Then this is an empty field.
+
+ if (ch == delimiter) {
+ return output.endEmptyField();
+ }
+
+ // Have the first character of the field. Parse and save the
+ // field, even if we hit EOF. (An EOF identifies a last line
+ // that contains data, but is not terminated with a newline.)
+
+ try {
+ if (ch == quote) {
+ parseQuotedValue(NULL_BYTE);
+ } else {
+ parseValue();
+ }
+ return output.endField();
+ } catch (StreamFinishedPseudoException e) {
+ return output.endField();
+ }
+ }
+
+ /**
+ * Helper function to skip white spaces occurring at the current input stream.
+ * @throws IOException for input file read errors
+ */
+ private void skipWhitespace() throws IOException {
+ final byte delimiter = this.delimiter;
+ final byte newLine = this.newLine;
+ final TextInput input = this.input;
+
+ while (isWhite(ch) && ch != delimiter && ch != newLine) {
+ ch = input.nextChar();
+ }
+ }
+
+ /**
+ * Starting point for the reader. Sets up the input interface.
+ * @throws IOException for input file read errors
+ */
+ public final void start() throws IOException {
+ context.stopped = false;
+ input.start();
+ }
+
+ /**
+ * Parses the next record from the input. Will skip the line if its a comment,
+ * this is required when the file contains headers
+ * @throws IOException for input file read errors
+ */
+ public final boolean parseNext() throws IOException {
+ try {
+ while (! context.stopped) {
+ ch = input.nextChar();
+ if (ch == comment) {
+ input.skipLines(1);
+ continue;
+ }
+ break;
+ }
+ final long initialLineNumber = input.lineCount();
+ boolean success = parseRecord();
+ if (initialLineNumber + 1 < input.lineCount()) {
+ throw new TextParsingException(context, "Cannot use newline character within quoted string");
+ }
+
+ return success;
+ } catch (UserException ex) {
+ stopParsing();
+ throw ex;
+ } catch (StreamFinishedPseudoException ex) {
+ stopParsing();
+ return false;
+ } catch (Exception ex) {
+ try {
+ throw handleException(ex);
+ } finally {
+ stopParsing();
+ }
+ }
+ }
+
+ private void stopParsing() { }
+
+ private String displayLineSeparators(String str, boolean addNewLine) {
+ if (addNewLine) {
+ if (str.contains("\r\n")) {
+ str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+ } else if (str.contains("\n")) {
+ str = str.replaceAll("\\n", "[\\\\n]\n\t");
+ } else {
+ str = str.replaceAll("\\r", "[\\\\r]\r\t");
+ }
+ } else {
+ str = str.replaceAll("\\n", "\\\\n");
+ str = str.replaceAll("\\r", "\\\\r");
+ }
+ return str;
+ }
+
+ /**
+ * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+ * the exception.
+ * @param ex Exception raised
+ * @throws IOException for input file read errors
+ */
+ private TextParsingException handleException(Exception ex) throws IOException {
+
+ if (ex instanceof TextParsingException) {
+ throw (TextParsingException) ex;
+ }
+
+ String message = null;
+ String tmp = input.getStringSinceMarkForError();
+ char[] chars = tmp.toCharArray();
+ if (chars != null) {
+ int length = chars.length;
+ if (length > settings.getMaxCharsPerColumn()) {
+ message = "Length of parsed input (" + length
+ + ") exceeds the maximum number of characters defined in your parser settings ("
+ + settings.getMaxCharsPerColumn() + "). ";
+ }
+
+ if (tmp.contains("\n") || tmp.contains("\r")) {
+ tmp = displayLineSeparators(tmp, true);
+ String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+ message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+ + lineSeparator + "'. Parsed content:\n\t" + tmp;
+ }
+
+ int nullCharacterCount = 0;
+ // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+ int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+ StringBuilder s = new StringBuilder(maxLength);
+ for (int i = 0; i < maxLength; i++) {
+ if (chars[i] == '\0') {
+ s.append('\\');
+ s.append('0');
+ nullCharacterCount++;
+ } else {
+ s.append(chars[i]);
+ }
+ }
+ tmp = s.toString();
+
+ if (nullCharacterCount > 0) {
+ message += "\nIdentified "
+ + nullCharacterCount
+ + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+ + tmp;
+ }
+ }
+
+ UserException.Builder builder;
+ if (ex instanceof UserException) {
+ builder = ((UserException) ex).rebuild();
+ } else {
+ builder = UserException
+ .dataReadError(ex)
+ .message(message);
+ }
+ throw builder
+ .addContext("Line", context.currentLine())
+ .addContext("Record", context.currentRecord())
+ .build(logger);
+ }
+
+ /**
+ * Finish the processing of a batch, indicates to the output
+ * interface to wrap up the batch
+ */
+ public void finishBatch() { }
+
+ /**
+ * Invoked once there are no more records and we are done with the
+ * current record reader to clean up state.
+ * @throws IOException for input file read errors
+ */
+ public void close() throws IOException {
+ input.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
new file mode 100644
index 000000000..aced5adfd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Version 3 of the text reader. Hosts the "compliant" text reader on
+ * the row set framework.
+ */
+package org.apache.drill.exec.store.easy.text.compliant.v3;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a379db175..37b8a4073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -593,7 +593,6 @@ public class Foreman implements Runnable {
new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
-
/**
* Manages the end-state processing for Foreman.
*
@@ -726,7 +725,6 @@ public class Foreman implements Runnable {
}
}
- @SuppressWarnings("resource")
@Override
public void close() {
Preconditions.checkState(!isClosed);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7e9415530..5443eea5f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -522,6 +522,8 @@ drill.exec.options: {
exec.queue.memory_reserve_ratio: 0.2,
exec.sort.disable_managed : false,
exec.storage.enable_new_text_reader: true,
+ exec.storage.enable_v3_text_reader: false,
+ exec.storage.min_width: 1,
exec.udf.enable_dynamic_support: true,
exec.udf.use_dynamic: true,
drill.exec.stats.logging.batch_size: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index e00e5dc60..87757782b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -128,6 +128,7 @@ public class TestPartitionFilter extends PlanTestBase {
String query = "select * from dfs.`multilevel/parquet` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)";
testIncludeFilter(query, 2, "Filter\\(", 8);
}
+
@Test //Parquet: partition filters are ANDed and belong to a top-level OR
public void testPartitionFilter3_Parquet_from_CTAS() throws Exception {
String query = "select * from dfs.tmp.parquet where (yr=1994 and qrtr='Q1' and o_custkey < 500) or (yr=1995 and qrtr='Q2' and o_custkey > 500)";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
index e5ac6d873..89df5986f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.fail;
import java.util.Iterator;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -50,12 +51,14 @@ import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* Test the implementation of the Drill Volcano iterator protocol that
* wraps the modular operator implementation.
*/
+@Category(RowSetTests.class)
public class TestOperatorRecordBatch extends SubOperatorTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubOperatorTest.class);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 88ccd3c41..2d066dee4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -64,7 +64,10 @@ public class TestColumnsArray extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
// ...and the columns array manager
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index d1e91a283..2a5b00e9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -232,7 +232,10 @@ public class TestColumnsArrayParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
index a6de5e6f3..bbc5e19a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
@@ -45,7 +45,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partiton columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
// Simulate SELECT a, b, c ...
@@ -70,7 +73,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
// Simulate SELECT a, fqn, filEPath, filename, suffix ...
@@ -114,7 +120,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
String dir0 = ScanTestUtils.partitionColName(0);
@@ -146,7 +155,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -171,6 +183,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
true, // Put partitions at end
new Path("hdfs:///w"),
+ 3, // Max partition depth is 3, though this "scan" sees only 2
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -178,12 +191,14 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Lists.newArrayList(metadataManager.projectionParser()));
List<ColumnProjection> cols = scanProj.columns();
- assertEquals(3, cols.size());
+ assertEquals(4, cols.size());
assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+ assertEquals(2, ((PartitionColumn) cols.get(3)).partition());
}
/**
@@ -199,6 +214,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
false, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -230,6 +246,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
false, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -262,7 +279,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -284,6 +304,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
true, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -310,6 +331,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
false, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -343,6 +365,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
true, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -367,6 +390,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
true, // Use legacy wildcard expansion
false, // Put partitions at end
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -394,7 +418,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
index 314bc2a13..dae89ab0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
@@ -163,7 +163,10 @@ public class TestFileMetadataProjection extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partiton columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -249,7 +252,10 @@ public class TestFileMetadataProjection extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
@@ -300,7 +306,10 @@ public class TestFileMetadataProjection extends SubOperatorTest {
Path filePath = new Path("hdfs:///x/0/1/2/3/4/5/6/7/8/9/10/d11/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///x"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanLevelProjection scanProj = new ScanLevelProjection(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index b05bb28f3..7b9fbfbe4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -103,6 +103,7 @@ public class TestFileScanFramework extends SubOperatorTest {
public abstract static class BaseFileScanOpFixture extends AbstractScanOpFixture {
protected Path selectionRoot = MOCK_ROOT_PATH;
+ protected int partitionDepth = 3;
protected List<FileWork> files = new ArrayList<>();
protected Configuration fsConfig = new Configuration();
@@ -116,7 +117,7 @@ public class TestFileScanFramework extends SubOperatorTest {
protected abstract BaseFileScanFramework<?> buildFramework();
private void configureFileScan(BaseFileScanFramework<?> framework) {
- framework.setSelectionRoot(selectionRoot);
+ framework.setSelectionRoot(selectionRoot, partitionDepth);
}
}
@@ -311,10 +312,11 @@ public class TestFileScanFramework extends SubOperatorTest {
.add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR)
.addNullable(ScanTestUtils.partitionColName(0), MinorType.VARCHAR)
.addNullable(ScanTestUtils.partitionColName(1), MinorType.VARCHAR)
+ .addNullable(ScanTestUtils.partitionColName(2), MinorType.VARCHAR)
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(30, "fred", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1)
- .addRow(40, "wilma", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1)
+ .addRow(30, "fred", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null)
+ .addRow(40, "wilma", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null)
.build();
RowSetComparison verifier = new RowSetComparison(expected);
assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
index c7b52e2da..7ee91a504 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
@@ -59,7 +59,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator());
@@ -124,7 +127,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
scanner.withMetadata(metadataManager);
@@ -193,7 +199,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
scanner.withMetadata(metadataManager);
@@ -269,7 +278,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePath));
scanner.withMetadata(metadataManager);
@@ -334,7 +346,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
Path filePathB = new Path("hdfs:///w/x/b.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePathA, filePathB));
scanner.withMetadata(metadataManager);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index 8adc0372a..cdfabc444 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -102,7 +102,10 @@ public class TestSchemaSmoothing extends SubOperatorTest {
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePathA, filePathB));
// Set up the scan level projection
@@ -580,7 +583,10 @@ public class TestSchemaSmoothing extends SubOperatorTest {
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePathA, filePathB));
// Set up the scan level projection
@@ -628,7 +634,10 @@ public class TestSchemaSmoothing extends SubOperatorTest {
Path filePathB = new Path("hdfs:///w/x/b.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePathA, filePathB));
// Set up the scan level projection
@@ -676,7 +685,10 @@ public class TestSchemaSmoothing extends SubOperatorTest {
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ false, // Don't expand partition columns for wildcard
+ false, // N/A
new Path("hdfs:///w"),
+ FileMetadataManager.AUTO_PARTITION_DEPTH,
Lists.newArrayList(filePathA, filePathB));
// Set up the scan level projection
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java
index 838c889f8..9c8bc9dd3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.validate;
import static org.junit.Assert.assertFalse;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
@@ -49,7 +48,6 @@ public class TestValidationOptions extends DrillTest {
.toConsole()
.logger(BatchValidator.class, Level.TRACE)
.logger(IteratorValidatorCreator.class, Level.TRACE)
- .logger(CompliantTextRecordReader.class, Level.TRACE)
.build();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
index 2966bd561..5fb046e32 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -353,9 +352,10 @@ public class TestProjectedTuple {
assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo"));
}
- @Test
- @Ignore("Drill syntax does not support map arrays")
- public void testMapArray() {
+ //@Test
+ //@Ignore("Drill syntax does not support map arrays")
+ @SuppressWarnings("unused")
+ private void testMapArray() {
RequestedTuple projSet = RequestedTupleImpl.parse(
RowSetTestUtils.projectList("a[1].x"));
List<RequestedColumn> cols = projSet.projections();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
new file mode 100644
index 000000000..056b8e45a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+
+public class BaseCsvTest extends ClusterTest {
+
+ protected static final String PART_DIR = "root";
+ protected static final String NESTED_DIR = "nested";
+ protected static final String ROOT_FILE = "first.csv";
+ protected static final String NESTED_FILE = "second.csv";
+
+ protected static String validHeaders[] = {
+ "a,b,c",
+ "10,foo,bar"
+ };
+
+ protected static String secondFile[] = {
+ "a,b,c",
+ "20,fred,wilma"
+ };
+
+ protected static File testDir;
+
+ protected static void setup(boolean skipFirstLine, boolean extractHeader) throws Exception {
+ setup(skipFirstLine, extractHeader, 1);
+ }
+
+ protected static void setup(boolean skipFirstLine, boolean extractHeader,
+ int maxParallelization) throws Exception {
+ startCluster(
+ ClusterFixture.builder(dirTestWatcher)
+ .maxParallelization(maxParallelization));
+
+ // Set up CSV storage plugin using headers.
+
+ TextFormatConfig csvFormat = new TextFormatConfig();
+ csvFormat.fieldDelimiter = ',';
+ csvFormat.skipFirstLine = skipFirstLine;
+ csvFormat.extractHeader = extractHeader;
+
+ testDir = cluster.makeDataDir("data", "csv", csvFormat);
+ }
+
+ protected static void buildNestedTable() throws IOException {
+
+ // Two-level partitioned table
+
+ File rootDir = new File(testDir, PART_DIR);
+ rootDir.mkdir();
+ buildFile(new File(rootDir, ROOT_FILE), validHeaders);
+ File nestedDir = new File(rootDir, NESTED_DIR);
+ nestedDir.mkdir();
+ buildFile(new File(nestedDir, NESTED_FILE), secondFile);
+ }
+
+ protected void enableV3(boolean enable) {
+ client.alterSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY, enable);
+ }
+
+ protected void resetV3() {
+ client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
+ }
+
+ protected static void buildFile(String fileName, String[] data) throws IOException {
+ buildFile(new File(testDir, fileName), data);
+ }
+
+ protected static void buildFile(File file, String[] data) throws IOException {
+ try(PrintWriter out = new PrintWriter(new FileWriter(file))) {
+ for (String line : data) {
+ out.println(line);
+ }
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
deleted file mode 100644
index 25aa738eb..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.text.compliant;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
-import org.apache.drill.test.ClusterFixture;
-import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetUtilities;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * SQL-level tests for CSV headers. See
- * {@link TestHeaderBuilder} for detailed unit tests.
- * This test does not attempt to duplicate all the cases
- * from the unit tests; instead it just does a sanity check.
- */
-
-public class TestCsv extends ClusterTest {
-
- private static final String CASE2_FILE_NAME = "case2.csv";
-
- private static File testDir;
-
- @BeforeClass
- public static void setup() throws Exception {
- startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
-
- // Set up CSV storage plugin using headers.
-
- TextFormatConfig csvFormat = new TextFormatConfig();
- csvFormat.fieldDelimiter = ',';
- csvFormat.skipFirstLine = false;
- csvFormat.extractHeader = true;
-
- testDir = cluster.makeDataDir("data", "csv", csvFormat);
- buildFile(CASE2_FILE_NAME, validHeaders);
- }
-
- private static String emptyHeaders[] = {
- "",
- "10,foo,bar"
- };
-
- @Test
- public void testEmptyCsvHeaders() throws IOException {
- String fileName = "case1.csv";
- buildFile(fileName, emptyHeaders);
- try {
- client.queryBuilder().sql(makeStatement(fileName)).run();
- fail();
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("must define at least one header"));
- }
- }
-
- private static String validHeaders[] = {
- "a,b,c",
- "10,foo,bar"
- };
-
- @Test
- public void testValidCsvHeaders() throws IOException {
- RowSet actual = client.queryBuilder().sql(makeStatement(CASE2_FILE_NAME)).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .add("b", MinorType.VARCHAR)
- .add("c", MinorType.VARCHAR)
- .buildSchema();
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar")
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- private static String invalidHeaders[] = {
- "$,,9b,c,c,c_2",
- "10,foo,bar,fourth,fifth,sixth"
- };
-
- @Test
- public void testInvalidCsvHeaders() throws IOException {
- String fileName = "case3.csv";
- buildFile(fileName, invalidHeaders);
- RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("column_1", MinorType.VARCHAR)
- .add("column_2", MinorType.VARCHAR)
- .add("col_9b", MinorType.VARCHAR)
- .add("c", MinorType.VARCHAR)
- .add("c_2", MinorType.VARCHAR)
- .add("c_2_2", MinorType.VARCHAR)
- .buildSchema();
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar", "fourth", "fifth", "sixth")
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- // Test fix for DRILL-5590
- @Test
- public void testCsvHeadersCaseInsensitive() throws IOException {
- String sql = "SELECT A, b, C FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .add("b", MinorType.VARCHAR)
- .add("C", MinorType.VARCHAR)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar")
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- private String makeStatement(String fileName) {
- return "SELECT * FROM `dfs.data`.`" + fileName + "`";
- }
-
- private static void buildFile(String fileName, String[] data) throws IOException {
- try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
- for (String line : data) {
- out.println(line);
- }
- }
- }
-
- /**
- * Verify that the wildcard expands columns to the header names, including
- * case
- */
- @Test
- public void testWildcard() throws IOException {
- String sql = "SELECT * FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .add("b", MinorType.VARCHAR)
- .add("c", MinorType.VARCHAR)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar")
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- /**
- * Verify that implicit columns are recognized and populated. Sanity test
- * of just one implicit column.
- */
- @Test
- public void testImplicitColsExplicitSelect() throws IOException {
- String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .addNullable("filename", MinorType.VARCHAR)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", CASE2_FILE_NAME)
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- /**
- * Verify that implicit columns are recognized and populated. Sanity test
- * of just one implicit column.
- */
- @Test
- public void testImplicitColsWildcard() throws IOException {
- String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .add("b", MinorType.VARCHAR)
- .add("c", MinorType.VARCHAR)
- .addNullable("filename", MinorType.VARCHAR)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar", CASE2_FILE_NAME)
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- /**
- * CSV does not allow explicit use of dir0, dir1, etc. columns. Treated
- * as undefined nullable int columns.
- * <p>
- * Note that the class path storage plugin does not support directories
- * (partitions). It is unclear if that should show up here as the
- * partition column names being undefined (hence Nullable INT) or should
- * they still be defined, but set to a null Nullable VARCHAR?
- */
- @Test
- public void testPartitionColsWildcard() throws IOException {
- String sql = "SELECT *, dir0, dir5 FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .add("b", MinorType.VARCHAR)
- .add("c", MinorType.VARCHAR)
- .addNullable("dir0", MinorType.INT)
- .addNullable("dir5", MinorType.INT)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", "foo", "bar", null, null)
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-
- /**
- * CSV does not allow explicit use of dir0, dir1, etc. columns. Treated
- * as undefined nullable int columns.
- */
- @Test
- public void testPartitionColsExplicit() throws IOException {
- String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
- RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .addNullable("dir0", MinorType.INT)
- .addNullable("dir5", MinorType.INT)
- .buildSchema();
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("10", null, null)
- .build();
- RowSetUtilities.verify(expected, actual);
- }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
new file mode 100644
index 000000000..d983f87e3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+// CSV reader now hosted on the row set framework
+@Category(RowSetTests.class)
+public class TestCsvIgnoreHeaders extends BaseCsvTest{
+
+ private static String withHeaders[] = {
+ "a,b,c",
+ "10,foo,bar",
+ "20,fred,wilma"
+ };
+
+ private static String raggedRows[] = {
+ "a,b,c",
+ "10,dino",
+ "20,foo,bar",
+ "30"
+ };
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ BaseCsvTest.setup(true, false);
+ }
+
+ @Test
+ public void testColumns() throws IOException {
+ try {
+ enableV3(false);
+ doTestColumns();
+ enableV3(true);
+ doTestColumns();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestColumns() throws IOException {
+ String fileName = "simple.csv";
+ buildFile(fileName, withHeaders);
+ String sql = "SELECT columns FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("10", "foo", "bar"))
+ .addSingleCol(strArray("20", "fred", "wilma"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testRaggedRows() throws IOException {
+ try {
+ enableV3(false);
+ doTestRaggedRows();
+ enableV3(true);
+ doTestRaggedRows();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestRaggedRows() throws IOException {
+ String fileName = "ragged.csv";
+ TestCsvWithHeaders.buildFile(new File(testDir, fileName), raggedRows);
+ String sql = "SELECT columns FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("10", "dino"))
+ .addSingleCol(strArray("20", "foo", "bar"))
+ .addSingleCol(strArray("30"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
new file mode 100644
index 000000000..655d04db6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Sanity test of CSV files with headers. Tests both the original
+ * "compliant" version and the V3 version based on the row set
+ * framework.
+ * <p>
+ * The CSV reader is a "canary in the coal mine" for many scan features.
+ * It turns out that there are several bugs in "V2" (AKA "new text reader")
+ * that are fixed in "V3" (the one based on the row set framework), and one
+ * that is not yet fixed.
+ *
+ * <ul>
+ * <li>Ragged rows will crash the V2 text reader when headers are used.
+ * No V2 test exists as a result. Fixed in V3.</li>
+ * <li>DRILL-7083: in V2, if files are nested to 2 levels, but we ask
+ * for dir2 (the non-existent third level), the type of dir2 will be
+ * nullable INT. In V3, the type is Nullable VARCHAR (just like for the
+ * existing partition levels.)</li>
+ * <li>DRILL-7080: A query like SELECT *, dir0 produces the result schema
+ * of (dir0, a, b, ...) in V2 and (a, b, ... dir0, dir00) in V3. This
+ * seems to be a bug in the Project operator.</li>
+ * </ul>
+ *
+ * The V3 tests all demonstrate that the row set scan framework
+ * delivers a first empty batch from each scan. I (Paul) had understood
+ * that we had an "fast schema" path as the result of the "empty batch"
+ * project. However, the V2 reader does not provide the schema-only
+ * first batch. So, not sure if doing so is a feature, or a bug because
+ * things changed. Easy enough to change if we choose to. If so, the
+ * tests here would remove the test for that schema-only batch.
+ * <p>
+ * Tests are run for both V2 and V3. When the results are the same,
+ * the test occurs once, wrapped in a "driver" to select V2 or V3 mode.
+ * When behavior differs, there are separate tests for V2 and V3.
+ * <p>
+ * The V2 tests are temporary. Once we accept that V3 is stable, we
+ * can remove V2 (and the "old text reader.") The behavior in V3 is
+ * more correct, no reason to keep the old, broken behavior.
+ *
+ * @see {@link TestHeaderBuilder}
+ */
+
+// CSV reader now hosted on the row set framework
+@Category(RowSetTests.class)
+public class TestCsvWithHeaders extends BaseCsvTest {
+
+ private static final String TEST_FILE_NAME = "case2.csv";
+
+ private static String invalidHeaders[] = {
+ "$,,9b,c,c,c_2",
+ "10,foo,bar,fourth,fifth,sixth"
+ };
+
+ private static String emptyHeaders[] = {
+ "",
+ "10,foo,bar"
+ };
+
+ private static String raggedRows[] = {
+ "a,b,c",
+ "10,dino",
+ "20,foo,bar",
+ "30"
+ };
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ BaseCsvTest.setup(false, true);
+ buildFile(TEST_FILE_NAME, validHeaders);
+ buildNestedTable();
+ }
+
+ private static final String EMPTY_FILE = "empty.csv";
+
+ @Test
+ public void testEmptyFile() throws IOException {
+ buildFile(EMPTY_FILE, new String[] {});
+ try {
+ enableV3(false);
+ doTestEmptyFile();
+ enableV3(true);
+ doTestEmptyFile();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestEmptyFile() throws IOException {
+ RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_FILE)).rowSet();
+ assertNull(rowSet);
+ }
+
+ private static final String EMPTY_HEADERS_FILE = "noheaders.csv";
+
+ /**
+ * Trivial case: empty header. This case should fail.
+ */
+
+ @Test
+ public void testEmptyCsvHeaders() throws IOException {
+ buildFile(EMPTY_HEADERS_FILE, emptyHeaders);
+ try {
+ enableV3(false);
+ doTestEmptyCsvHeaders();
+ enableV3(true);
+ doTestEmptyCsvHeaders();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestEmptyCsvHeaders() throws IOException {
+ try {
+ client.queryBuilder().sql(makeStatement(EMPTY_HEADERS_FILE)).run();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("must define at least one header"));
+ }
+ }
+
+ @Test
+ public void testValidCsvHeaders() throws IOException {
+ try {
+ enableV3(false);
+ doTestValidCsvHeaders();
+ enableV3(true);
+ doTestValidCsvHeaders();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestValidCsvHeaders() throws IOException {
+ RowSet actual = client.queryBuilder().sql(makeStatement(TEST_FILE_NAME)).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testInvalidCsvHeaders() throws IOException {
+ try {
+ enableV3(false);
+ doTestInvalidCsvHeaders();
+ enableV3(true);
+ doTestInvalidCsvHeaders();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestInvalidCsvHeaders() throws IOException {
+ String fileName = "case3.csv";
+ buildFile(fileName, invalidHeaders);
+ RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("column_1", MinorType.VARCHAR)
+ .add("column_2", MinorType.VARCHAR)
+ .add("col_9b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .add("c_2", MinorType.VARCHAR)
+ .add("c_2_2", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", "fourth", "fifth", "sixth")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testCsvHeadersCaseInsensitive() throws IOException {
+ try {
+ enableV3(false);
+ doTestCsvHeadersCaseInsensitive();
+ enableV3(true);
+ doTestCsvHeadersCaseInsensitive();
+ } finally {
+ resetV3();
+ }
+ }
+
+ // Test fix for DRILL-5590
+ private void doTestCsvHeadersCaseInsensitive() throws IOException {
+ String sql = "SELECT A, b, C FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ private String makeStatement(String fileName) {
+ return "SELECT * FROM `dfs.data`.`" + fileName + "`";
+ }
+
+ @Test
+ public void testWildcard() throws IOException {
+ try {
+ enableV3(false);
+ doTestWildcard();
+ enableV3(true);
+ doTestWildcard();
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Verify that the wildcard expands columns to the header names, including
+ * case
+ */
+ private void doTestWildcard() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ /**
+ * Verify that implicit columns are recognized and populated. Sanity test
+ * of just one implicit column. V2 uses nullable VARCHAR for file
+ * metadata columns.
+ */
+
+ @Test
+ public void testImplicitColsExplicitSelectV2() throws IOException {
+ try {
+ enableV3(false);
+ String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .addNullable("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Verify that implicit columns are recognized and populated. Sanity test
+ * of just one implicit column. V3 uses non-nullable VARCHAR for file
+ * metadata columns.
+ */
+
+ @Test
+ public void testImplicitColsExplicitSelectV3() throws IOException {
+ try {
+ enableV3(true);
+ String sql = "SELECT A, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Verify that implicit columns are recognized and populated. Sanity test
+ * of just one implicit column. V2 uses nullable VARCHAR for file
+ * metadata columns.
+ */
+
+ @Test
+ public void testImplicitColWildcardV2() throws IOException {
+ try {
+ enableV3(false);
+ String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Verify that implicit columns are recognized and populated. Sanity test
+ * of just one implicit column. V3 uses non-nullable VARCHAR for file
+ * metadata columns.
+ */
+
+ @Test
+ public void testImplicitColWildcardV3() throws IOException {
+ try {
+ enableV3(true);
+ String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .add("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void testColsWithWildcard() throws IOException {
+ try {
+ enableV3(false);
+ doTestColsWithWildcard();
+ enableV3(true);
+ doTestColsWithWildcard();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestColsWithWildcard() throws IOException {
+ String sql = "SELECT *, a as d FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", "10")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ /**
+ * V2 does not allow explicit use of dir0, dir1, etc. columns for a non-partitioned
+ * file. Treated as undefined nullable int columns.
+ */
+
+ @Test
+ public void testPartitionColsExplicitV2() throws IOException {
+ try {
+ enableV3(false);
+ String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.INT)
+ .addNullable("dir5", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", null, null)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * V3 allows the use of partition columns, even for a non-partitioned file.
+ * The columns are null of type Nullable VARCHAR. This is area of Drill
+ * is a bit murky: it seems reasonable to support partition columns consistently
+ * rather than conditionally based on the structure of the input.
+ */
+ @Test
+ public void testPartitionColsExplicitV3() throws IOException {
+ try {
+ enableV3(true);
+ String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .addNullable("dir5", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", null, null)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void testDupColumn() throws IOException {
+ try {
+ enableV3(false);
+ doTestDupColumn();
+ enableV3(true);
+ doTestDupColumn();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestDupColumn() throws IOException {
+ String sql = "SELECT a, b, a FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("a0", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "10")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ // This test cannot be run for V2. The data gets corrupted and we get
+ // internal errors.
+
+ /**
+ * Test that ragged rows result in the "missing" columns being filled
+ * in with the moral equivalent of a null column for CSV: a blank string.
+ */
+ @Test
+ public void testRaggedRowsV3() throws IOException {
+ try {
+ enableV3(true);
+ String fileName = "case4.csv";
+ buildFile(fileName, raggedRows);
+ RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "dino", "")
+ .addRow("20", "foo", "bar")
+ .addRow("30", "", "")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test partition expansion. Because the two files are read in the
+ * same scan operator, the schema is consistent. See
+ * {@link TestPartitionRace} for the multi-threaded race where all
+ * hell breaks loose.
+ * <p>
+ * V2, since Drill 1.12, puts partition columns ahead of data columns.
+ */
+ @Test
+ public void testPartitionExpansionV2() throws IOException {
+ try {
+ enableV3(false);
+
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("dir0", MinorType.VARCHAR)
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String col2 = reader.scalar(1).getString();
+ if (col2.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(null, "10", "foo", "bar")
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(NESTED_DIR, "20", "fred", "wilma")
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test partition expansion in V3.
+ * <p>
+ * This test is tricky because it will return two data batches
+ * (preceded by an empty schema batch.) File read order is random
+ * so we have to expect the files in either order.
+ * <p>
+ * V3, as in V2 before Drill 1.12, puts partition columns after
+ * data columns (so that data columns don't shift positions if
+ * files are nested to another level.)
+ */
+ @Test
+ public void testPartitionExpansionV3() throws IOException {
+ try {
+ enableV3(true);
+
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .buildSchema();
+
+ // First batch is empty; just carries the schema.
+
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ assertEquals(0, rowSet.rowCount());
+ rowSet.clear();
+
+ // Read the other two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String col1 = reader.scalar(0).getString();
+ if (col1.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test the use of partition columns with the wildcard. This works for file
+ * metadata columns, but confuses the project operator when used for
+ * partition columns. DRILL-7080.
+ */
+ @Test
+ public void testWilcardAndPartitionsMultiFilesV2() throws IOException {
+ try {
+ enableV3(false);
+
+ String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("dir0", MinorType.VARCHAR)
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir00", MinorType.VARCHAR)
+ .addNullable("dir1", MinorType.INT)
+ .buildSchema();
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String aCol = reader.scalar("a").getString();
+ if (aCol.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(null, "10", "foo", "bar", null, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(NESTED_DIR, "20", "fred", "wilma", NESTED_DIR, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test the use of partition columns with the wildcard. This works for file
+ * metadata columns, but confuses the project operator when used for
+ * partition columns. DRILL-7080. Still broken in V3 because this appears
+ * to be a Project operator issue, not reader issue. Not that the
+ * partition column moves after data columns.
+ */
+ @Test
+ public void testWilcardAndPartitionsMultiFilesV3() throws IOException {
+ try {
+ enableV3(true);
+
+ String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .addNullable("dir1", MinorType.VARCHAR)
+ .addNullable("dir00", MinorType.VARCHAR)
+ .addNullable("dir10", MinorType.VARCHAR)
+ .buildSchema();
+
+ // First batch is empty; just carries the schema.
+
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+ rowSet);
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String aCol = reader.scalar("a").getString();
+ if (aCol.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null, null, null, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR, null, NESTED_DIR, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test using partition columns with partitioned files in V2. Since the
+ * file is nested to one level, dir0 is a nullable VARCHAR, but dir1 is
+ * a nullable INT. Since both files are read in a single scan operator,
+ * the schema is consistent.
+ */
+ @Test
+ public void doTestExplicitPartitionsMultiFilesV2() throws IOException {
+ try {
+ enableV3(false);
+
+ String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .addNullable("dir1", MinorType.INT)
+ .buildSchema();
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String aCol = reader.scalar("a").getString();
+ if (aCol.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test using partition columns with partitioned files in V3. Although the
+ * file is nested to one level, both dir0 and dir1 are nullable VARCHAR.
+ * See {@link TestPartitionRace} to show that the types and schemas
+ * are consistent even when used across multiple scans.
+ */
+ @Test
+ public void doTestExplicitPartitionsMultiFilesV3() throws IOException {
+ try {
+ enableV3(true);
+
+ String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .addNullable("dir1", MinorType.VARCHAR)
+ .buildSchema();
+
+ // First batch is empty; just carries the schema.
+
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(),
+ rowSet);
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String aCol = reader.scalar("a").getString();
+ if (aCol.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR, null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ }
+ finally {
+ resetV3();
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
new file mode 100644
index 000000000..6051875f2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+// CSV reader now hosted on the row set framework
+@Category(RowSetTests.class)
+public class TestCsvWithoutHeaders extends BaseCsvTest {
+
+ private static final String TEST_FILE_NAME = "simple.csv";
+
+ private static String sampleData[] = {
+ "10,foo,bar",
+ "20,fred,wilma"
+ };
+
+ private static String raggedRows[] = {
+ "10,dino",
+ "20,foo,bar",
+ "30"
+ };
+
+ private static String secondSet[] = {
+ "30,barney,betty"
+ };
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ BaseCsvTest.setup(false, false);
+
+ buildFile(TEST_FILE_NAME, sampleData);
+ buildNestedTableWithoutHeaders();
+ }
+
+ protected static void buildNestedTableWithoutHeaders() throws IOException {
+
+ // Two-level partitioned table
+
+ File rootDir = new File(testDir, PART_DIR);
+ rootDir.mkdir();
+ buildFile(new File(rootDir, ROOT_FILE), sampleData);
+ File nestedDir = new File(rootDir, NESTED_DIR);
+ nestedDir.mkdir();
+ buildFile(new File(nestedDir, NESTED_FILE), secondSet);
+ }
+
+ @Test
+ public void testWildcard() throws IOException {
+ try {
+ enableV3(false);
+ doTestWildcard();
+ enableV3(true);
+ doTestWildcard();
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Verify that the wildcard expands to the `columns` array
+ */
+
+ private void doTestWildcard() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("10", "foo", "bar"))
+ .addSingleCol(strArray("20", "fred", "wilma"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testColumns() throws IOException {
+ try {
+ enableV3(false);
+ doTestColumns();
+ enableV3(true);
+ doTestColumns();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestColumns() throws IOException {
+ String sql = "SELECT columns FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("10", "foo", "bar"))
+ .addSingleCol(strArray("20", "fred", "wilma"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void doTestWildcardAndMetadataV2() throws IOException {
+ try {
+ enableV3(false);
+ String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .addNullable("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+ .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void doTestWildcardAndMetadataV3() throws IOException {
+ try {
+ enableV3(true);
+ String sql = "SELECT *, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .add("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+ .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void testColumnsAndMetadataV2() throws IOException {
+ try {
+ enableV3(false);
+ String sql = "SELECT columns, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .addNullable("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+ .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void testColumnsAndMetadataV3() throws IOException {
+ try {
+ enableV3(true);
+ String sql = "SELECT columns, filename FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .add("filename", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME)
+ .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetV3();
+ }
+ }
+
+ @Test
+ public void testSpecificColumns() throws IOException {
+ try {
+ enableV3(false);
+ doTestSpecificColumns();
+ enableV3(true);
+ doTestSpecificColumns();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestSpecificColumns() throws IOException {
+ String sql = "SELECT columns[0], columns[2] FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("EXPR$0", MinorType.VARCHAR)
+ .addNullable("EXPR$1", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "bar")
+ .addRow("20", "wilma")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ @Test
+ public void testRaggedRows() throws IOException {
+ try {
+ enableV3(false);
+ doTestRaggedRows();
+ enableV3(true);
+ doTestRaggedRows();
+ } finally {
+ resetV3();
+ }
+ }
+
+ private void doTestRaggedRows() throws IOException {
+ String fileName = "ragged.csv";
+ buildFile(fileName, raggedRows);
+ String sql = "SELECT columns FROM `dfs.data`.`%s`";
+ RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addSingleCol(strArray("10", "dino"))
+ .addSingleCol(strArray("20", "foo", "bar"))
+ .addSingleCol(strArray("30"))
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ }
+
+ /**
+ * Test partition expansion. Because the two files are read in the
+ * same scan operator, the schema is consistent.
+ * <p>
+ * V2, since Drill 1.12, puts partition columns ahead of data columns.
+ */
+ @Test
+ public void testPartitionExpansionV2() throws IOException {
+ try {
+ enableV3(false);
+
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("dir0", MinorType.VARCHAR)
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ // Read the two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ ArrayReader ar = reader.array(1);
+ assertTrue(ar.next());
+ String col1 = ar.scalar().getString();
+ if (col1.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(null, strArray("10", "foo", "bar"))
+ .addRow(null, strArray("20", "fred", "wilma"))
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(NESTED_DIR, strArray("30", "barney", "betty"))
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * Test partition expansion in V3.
+ * <p>
+ * V3, as in V2 before Drill 1.12, puts partition columns after
+ * data columns (so that data columns don't shift positions if
+ * files are nested to another level.)
+ */
+ @Test
+ public void testPartitionExpansionV3() throws IOException {
+ try {
+ enableV3(true);
+
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .buildSchema();
+
+ // First batch is empty; just carries the schema.
+
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ assertEquals(0, rowSet.rowCount());
+ rowSet.clear();
+
+ // Read the other two batches.
+
+ for (int i = 0; i < 2; i++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ ArrayReader ar = reader.array(0);
+ assertTrue(ar.next());
+ String col1 = ar.scalar().getString();
+ if (col1.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("10", "foo", "bar"), null)
+ .addRow(strArray("20", "fred", "wilma"), null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(strArray("30", "barney", "betty"), NESTED_DIR)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
index 20bf79652..67429fbd7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
@@ -21,25 +21,32 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.easy.text.compliant.v3.HeaderBuilder;
import org.apache.drill.test.DrillTest;
+import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+/**
+ * Test the mechanism that builds column names from a set of CSV
+ * headers. The mechanism provides reasonable defaults for missing
+ * or invalid headers.
+ */
+
public class TestHeaderBuilder extends DrillTest {
@Test
public void testEmptyHeader() {
- HeaderBuilder hb = new HeaderBuilder();
- hb.startBatch();
+ Path dummyPath = new Path("file:/dummy.csv");
+ HeaderBuilder hb = new HeaderBuilder(dummyPath);
try {
hb.finishRecord();
} catch (UserException e) {
assertTrue(e.getMessage().contains("must define at least one header"));
}
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"");
try {
hb.finishRecord();
@@ -47,127 +54,107 @@ public class TestHeaderBuilder extends DrillTest {
assertTrue(e.getMessage().contains("must define at least one header"));
}
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb," ");
validateHeader(hb, new String[] {"column_1"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,",");
validateHeader(hb, new String[] {"column_1", "column_2"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb," , ");
validateHeader(hb, new String[] {"column_1", "column_2"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"a, ");
validateHeader(hb, new String[] {"a", "column_2"});
}
@Test
public void testWhiteSpace() {
- HeaderBuilder hb = new HeaderBuilder();
- hb.startBatch();
+ Path dummyPath = new Path("file:/dummy.csv");
+ HeaderBuilder hb = new HeaderBuilder(dummyPath);
parse(hb,"a");
validateHeader(hb, new String[] {"a"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb," a ");
validateHeader(hb, new String[] {"a"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb," a ");
validateHeader(hb, new String[] {"a"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"a,b,c");
validateHeader(hb, new String[] {"a","b","c"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb," a , b , c ");
validateHeader(hb, new String[] {"a","b","c"});
}
@Test
public void testSyntax() {
- HeaderBuilder hb = new HeaderBuilder();
- hb.startBatch();
+ Path dummyPath = new Path("file:/dummy.csv");
+ HeaderBuilder hb = new HeaderBuilder(dummyPath);
parse(hb,"a_123");
validateHeader(hb, new String[] {"a_123"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"a_123_");
validateHeader(hb, new String[] {"a_123_"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"az09_");
validateHeader(hb, new String[] {"az09_"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"+");
validateHeader(hb, new String[] {"column_1"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"+,-");
validateHeader(hb, new String[] {"column_1", "column_2"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"+9a");
validateHeader(hb, new String[] {"col_9a"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"9a");
validateHeader(hb, new String[] {"col_9a"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"a+b");
validateHeader(hb, new String[] {"a_b"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"a_b");
validateHeader(hb, new String[] {"a_b"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"EXPR$0");
validateHeader(hb, new String[] {"EXPR_0"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"(_-^-_)");
validateHeader(hb, new String[] {"col_______"});
}
@Test
public void testUnicode() {
- HeaderBuilder hb = new HeaderBuilder();
- hb.startBatch();
+ Path dummyPath = new Path("file:/dummy.csv");
+ HeaderBuilder hb = new HeaderBuilder(dummyPath);
parse(hb,"Αθήνα");
validateHeader(hb, new String[] {"Αθήνα"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"Москва");
validateHeader(hb, new String[] {"Москва"});
- hb = new HeaderBuilder();
- hb.startBatch();
+ hb = new HeaderBuilder(dummyPath);
parse(hb,"Paris,Αθήνα,Москва");
validateHeader(hb, new String[] {"Paris","Αθήνα","Москва"});
}
@@ -183,8 +170,8 @@ public class TestHeaderBuilder extends DrillTest {
}
private void testParser(String input, String[] expected) {
- HeaderBuilder hb = new HeaderBuilder();
- hb.startBatch();
+ Path dummyPath = new Path("file:/dummy.csv");
+ HeaderBuilder hb = new HeaderBuilder(dummyPath);
parse(hb,input);
hb.finishRecord();
validateHeader(hb, expected);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
new file mode 100644
index 000000000..6e98339ee
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Demonstrates a race condition inherent in the way that partition
+ * columns are currently implemented. Two files: one at the root directory,
+ * one down one level. Parallelization is forced to two. (Most tests use
+ * small files and both files end up being read in the same scanner, which
+ * masks the problem shown here.)
+ * <p>
+ * Depending on which file is read first, the output row may start with
+ * or without the partition column. Once the column occurs, it will
+ * persist.
+ * <p>
+ * The solution is to figure out the max partition depth in the
+ * EasySubScan rather than in each scan operator.
+ * <p>
+ * The tests here test both the "V2" (AKA "new text reader") which has
+ * many issues, and the "V3" (row-set-based version) that has fixes.
+ * <p>
+ * See DRILL-7082 for the multi-scan race (fixed in V3), and
+ * DRILL-7083 for the problem with partition columns returning nullable INT
+ * (also fixed in V3.)
+ */
+
+public class TestPartitionRace extends BaseCsvTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ BaseCsvTest.setup(false, true, 2);
+
+ // Two-level partitioned table
+
+ File rootDir = new File(testDir, PART_DIR);
+ rootDir.mkdir();
+ buildFile(new File(rootDir, "first.csv"), validHeaders);
+ File nestedDir = new File(rootDir, NESTED_DIR);
+ nestedDir.mkdir();
+ buildFile(new File(nestedDir, "second.csv"), secondFile);
+ }
+
+ /**
+ * Oddly, when run in a single fragment, the files occur in a
+ * stable order, the partition always appars, and it appears in
+ * the first column position.
+ */
+ @Test
+ public void testSingleScanV2() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+
+ try {
+ enableV3(false);
+
+ // Loop to run the query 10 times, or until we see the race
+
+ boolean sawMissingPartition = false;
+ boolean sawPartitionFirst = false;
+ boolean sawPartitionLast = false;
+
+ // Read the two batches.
+
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+ for (int j = 0; j < 2; j++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Check location of partition column
+
+ int posn = rowSet.schema().index("dir0");
+ if (posn == -1) {
+ sawMissingPartition = true;
+ } else if (posn == 0) {
+ sawPartitionFirst = true;
+ } else {
+ sawPartitionLast = true;
+ }
+ rowSet.clear();
+ }
+ assertFalse(iter.hasNext());
+
+ // When run in a single fragment, the partition column appears
+ // all the time, and is in the first column position.
+
+ assertFalse(sawMissingPartition);
+ assertTrue(sawPartitionFirst);
+ assertFalse(sawPartitionLast);
+ } finally {
+ resetV3();
+ client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
+ }
+ }
+
+ /**
+ * V3 provides the same schema for the single- and multi-scan
+ * cases.
+ */
+ @Test
+ public void testSingleScanV3() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .buildSchema();
+
+ try {
+ enableV3(true);
+
+ // Loop to run the query 10 times to verify no race
+
+ // First batch is empty; just carries the schema.
+
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ assertEquals(0, rowSet.rowCount());
+ rowSet.clear();
+
+ // Read the two batches.
+
+ for (int j = 0; j < 2; j++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String col1 = reader.scalar("a").getString();
+ if (col1.equals("10")) {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ resetV3();
+ }
+ }
+
+ /**
+ * When forced to run in two fragments, the fun really starts. The
+ * partition column (usually) appears in the last column position instead
+ * of the first. The partition may or may not occur in the first row
+ * depending on which file is read first. The result is that the
+ * other columns will jump around. If we tried to create an expected
+ * result set, we'd be frustrated because the schema randomly changes.
+ * <p>
+ * Just to be clear: this behavior is a bug, not a feature. But, it is
+ * an established baseline for the "V2" reader.
+ * <p>
+ * This is really a test (demonstration) of the wrong behavior. This test
+ * is pretty unreliable. In particular, the position of the partition column
+ * seems to randomly shift from first to last position across runs.
+ */
+ @Test
+ public void testRaceV2() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+
+ try {
+ enableV3(false);
+
+ // Special test-only feature to force even small scans
+ // to use more than one thread. Requires that the max
+ // parallelization option be set when starting the cluster.
+
+ client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2);
+
+ // Loop to run the query 10 times, or until we see the race
+
+ boolean sawRootFirst = false;
+ boolean sawNestedFirst = false;
+ boolean sawMissingPartition = false;
+ boolean sawPartitionFirst = false;
+ boolean sawPartitionLast = false;
+ for (int i = 0; i < 10; i++) {
+
+ // Read the two batches.
+
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+ for (int j = 0; j < 2; j++) {
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+
+ // Check location of partition column
+
+ int posn = rowSet.schema().index("dir0");
+ if (posn == -1) {
+ sawMissingPartition = true;
+ } else if (posn == 0) {
+ sawPartitionFirst = true;
+ } else {
+ sawPartitionLast = true;
+ }
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String col1 = reader.scalar("a").getString();
+ if (col1.equals("10")) {
+ if (i == 0) {
+ sawRootFirst = true;
+ }
+ } else {
+ if (i == 0) {
+ sawNestedFirst = true;
+ }
+ }
+ rowSet.clear();
+ }
+ assertFalse(iter.hasNext());
+ if (sawMissingPartition &&
+ sawPartitionFirst &&
+ sawPartitionLast &&
+ sawRootFirst &&
+ sawNestedFirst) {
+ // The following should appear most of the time.
+ System.out.println("All variations occurred");
+ return;
+ }
+ }
+
+ // If you see this, maybe something got fixed. Or, maybe the
+ // min parallelization hack above stopped working.
+ // Or, you were just unlucky and can try the test again.
+ // We print messages, rather than using assertTrue, to avoid
+ // introducing a flaky test.
+
+ System.out.println("Some variations did not occur");
+ System.out.println(String.format("Missing partition: %s", sawMissingPartition));
+ System.out.println(String.format("Partition first: %s", sawPartitionFirst));
+ System.out.println(String.format("Partition last: %s", sawPartitionLast));
+ System.out.println(String.format("Outer first: %s", sawRootFirst));
+ System.out.println(String.format("Nested first: %s", sawNestedFirst));
+ } finally {
+ resetV3();
+ client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
+ }
+ }
+
+ /**
+ * V3 computes partition depth in the group scan (which sees all files), and
+ * so the partition column count does not vary across scans. Also, V3 puts
+ * partition columns at the end of the row so that data columns don't
+ * "jump around" when files are shifted to a new partition depth.
+ */
+ @Test
+ public void testNoRaceV3() throws IOException {
+ String sql = "SELECT * FROM `dfs.data`.`%s`";
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .add("b", MinorType.VARCHAR)
+ .add("c", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .buildSchema();
+
+ try {
+ enableV3(true);
+ client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2);
+
+ // Loop to run the query 10 times or until we see both files
+ // in the first position.
+
+ boolean sawRootFirst = false;
+ boolean sawNestedFirst = false;
+ for (int i = 0; i < 10; i++) {
+
+ // First batch is empty; just carries the schema.
+
+ Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
+ assertTrue(iter.hasNext());
+ RowSet rowSet = iter.next();
+ assertEquals(0, rowSet.rowCount());
+ rowSet.clear();
+
+ // Read the two batches.
+
+ for (int j = 0; j < 2; j++) {
+ assertTrue(iter.hasNext());
+ rowSet = iter.next();
+
+ // Figure out which record this is and test accordingly.
+
+ RowSetReader reader = rowSet.reader();
+ assertTrue(reader.next());
+ String col1 = reader.scalar("a").getString();
+ if (col1.equals("10")) {
+ if (i == 0) {
+ sawRootFirst = true;
+ }
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("10", "foo", "bar", null)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ } else {
+ if (i == 0) {
+ sawNestedFirst = true;
+ }
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow("20", "fred", "wilma", NESTED_DIR)
+ .build();
+ RowSetUtilities.verify(expected, rowSet);
+ }
+ }
+ assertFalse(iter.hasNext());
+ if (sawRootFirst &&
+ sawNestedFirst) {
+ // The following should appear most of the time.
+ System.out.println("Both variations occurred");
+ return;
+ }
+ }
+
+ // If you see this, maybe something got fixed. Or, maybe the
+ // min parallelization hack above stopped working.
+ // Or, you were just unlucky and can try the test again.
+ // We print messages, rather than using assertTrue, to avoid
+ // introducing a flaky test.
+
+ System.out.println("Some variations did not occur");
+ System.out.println(String.format("Outer first: %s", sawRootFirst));
+ System.out.println(String.format("Nested first: %s", sawNestedFirst));
+ } finally {
+ resetV3();
+ client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
index 6eb9bbfbe..43ad0d86d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
@@ -61,6 +61,8 @@ public class TestNewTextReader extends BaseTestQuery {
fail("Query should have failed");
} catch(UserRemoteException ex) {
assertEquals(ErrorType.DATA_READ, ex.getErrorType());
+ // Change to the following if V3 is enabled
+ // assertEquals(ErrorType.VALIDATION, ex.getErrorType());
assertTrue("Error message should contain " + COL_NAME, ex.getMessage().contains(COL_NAME));
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index a9d2977b4..601356e28 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -525,7 +525,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
public static final String EXPLAIN_PLAN_JSON = "json";
public static ClusterFixtureBuilder builder(BaseDirTestWatcher dirTestWatcher) {
- ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+ ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
.sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE);
Properties props = new Properties();
props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 629714b36..63b818e2e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -366,7 +366,7 @@ public class QueryBuilder {
}
}
- public QueryRowSetIterator rowSetIterator( ) {
+ public QueryRowSetIterator rowSetIterator() {
return new QueryRowSetIterator(client.allocator(), withEventListener());
}