diff options
Diffstat (limited to 'exec')
59 files changed, 5141 insertions, 696 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 93adda17c..25ac2c9f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -713,6 +713,14 @@ public final class ExecConstants { public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY, new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files.")); + public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader"; + public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY, + new OptionDescription("Enables the row set based version of the text/csv reader.")); + + public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width"; + public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY, + new OptionDescription("Min width for text readers, mostly for testing.")); + public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java index 84aebdcd0..ebd7288a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java @@ -31,7 +31,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; public abstract class AbstractExchange extends AbstractSingle implements Exchange { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class); // Ephemeral info for generating execution fragments. protected int senderMajorFragmentId; @@ -77,7 +76,7 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang } /** - * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances + * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrences * in given endpoint list. * * @param fragmentEndpoints Drillbit endpoint assignments of fragments. @@ -111,7 +110,6 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang setupSenders(senderLocations); } - @Override public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { this.receiverMajorFragmentId = majorFragmentId; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java index 3bb1e5403..a5229f9d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.drill.exec.store.dfs.FileSelection; public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class); public AbstractFileGroupScan(String userName) { super(userName); @@ -46,5 +45,4 @@ public abstract class AbstractFileGroupScan extends AbstractGroupScan implements public boolean supportsPartitionFilterPushdown() { return true; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java index d744db424..db4c73d9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java @@ -25,8 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -public abstract class AbstractSubScan extends AbstractBase implements SubScan{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class); +public abstract class AbstractSubScan extends AbstractBase implements SubScan { public AbstractSubScan(String userName) { super(userName); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 063e26bef..1abc3d804 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -499,7 +499,6 @@ public class ScanBatch implements CloseableRecordBatch { schemaChanged = false; } - @SuppressWarnings("resource") private <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz, boolean isImplicitField) throws SchemaChangeException { Map<String, ValueVector> fieldVectorMap; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index c9d97bf72..436aea06b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -42,7 +42,6 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; @@ -77,21 +76,21 @@ import java.util.List; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); + + private static final String EMPTY_STRING = ""; + private Projector projector; private List<ValueVector> allocationVectors; private List<ComplexWriter> complexWriters; private List<FieldReference> complexFieldReferencesList; private boolean hasRemainder = false; - private int remainderIndex = 0; + private int remainderIndex; private int recordCount; - private ProjectMemoryManager memoryManager; - - - private static final String EMPTY_STRING = ""; private boolean first = true; private boolean wasNone = false; // whether a NONE iter outcome was already seen + private ColumnExplorer columnExplorer; private class ClassifierResult { public boolean isStar = false; @@ -114,6 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); + columnExplorer = new ColumnExplorer(context.getOptions()); } @Override @@ -121,14 +121,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { return recordCount; } - @Override protected void killIncoming(final boolean sendUpstream) { super.killIncoming(sendUpstream); hasRemainder = false; } - @Override public IterOutcome innerNext() { if (wasNone) { @@ -145,7 +143,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override public VectorContainer getOutgoingContainer() { - return this.container; + return container; } @Override @@ -204,14 +202,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } incomingRecordCount = incoming.getRecordCount(); memoryManager.update(); - logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}", + logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}", memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); } } } if (complexWriters != null && getLastKnownOutcome() == EMIT) { - throw new UnsupportedOperationException("Currently functions producing complex types as output is not " + + throw new UnsupportedOperationException("Currently functions producing complex types as output are not " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + "function in the projection list of outermost query."); } @@ -219,7 +217,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { first = false; container.zeroVectors(); - int maxOuputRecordCount = memoryManager.getOutputRowCount(); logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}", memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); @@ -233,7 +230,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { long projectEndTime = System.currentTimeMillis(); logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime)); - if (outputRecords < incomingRecordCount) { setValueCount(outputRecords); hasRemainder = true; @@ -277,7 +273,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0); long projectEndTime = System.currentTimeMillis(); - logger.trace("handleRemainder: projection: " + "records {}, time {} ms", projRecords,(projectEndTime - projectStartTime)); + logger.trace("handleRemainder: projection: records {}, time {} ms", projRecords,(projectEndTime - projectStartTime)); if (projRecords < remainingRecordCount) { setValueCount(projRecords); @@ -463,7 +459,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); memoryManager.addNewField(vv, write); - final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); } } continue; @@ -546,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); - final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); memoryManager.addNewField(ouputVector, write); // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. @@ -590,7 +586,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } private boolean isImplicitFileColumn(ValueVector vvIn) { - return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; + return columnExplorer.isImplicitFileColumn(vvIn.getField().getName()); } private List<NamedExpression> getExpressionList() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java index 966a03901..b9b3e782e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java @@ -24,7 +24,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; -import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput; +import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader; /** * Parses the `columns` array. Doing so is surprisingly complex. @@ -113,14 +113,14 @@ public class ColumnsArrayParser implements ScanProjectionParser { if (inCol.isArray()) { int maxIndex = inCol.maxIndex(); - if (maxIndex > RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) { + if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { throw UserException .validationError() .message(String.format( "`columns`[%d] index out of bounds, max supported size is %d", - maxIndex, RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)) + maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS)) .addContext("Column", inCol.name()) - .addContext("Maximum index", RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) + .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS) .build(logger); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java index 8352dfa07..1166a5234 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java @@ -69,6 +69,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File private List<FileSplit> spilts = new ArrayList<>(); private Iterator<FileSplit> splitIter; private Path scanRootDir; + private int partitionDepth; protected DrillFileSystem dfs; private FileMetadataManager metadataManager; @@ -82,12 +83,18 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File /** * Specify the selection root for a directory scan, if any. - * Used to populate partition columns. + * Used to populate partition columns. Also, specify the maximum + * partition depth. + * * @param rootPath Hadoop file path for the directory + * @param partitionDepth maximum partition depth across all files + * within this logical scan operator (files in this scan may be + * shallower) */ - public void setSelectionRoot(Path rootPath) { + public void setSelectionRoot(Path rootPath, int partitionDepth) { this.scanRootDir = rootPath; + this.partitionDepth = partitionDepth; } @Override @@ -122,7 +129,10 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File metadataManager = new FileMetadataManager( context.getFragmentContext().getOptions(), + true, // Expand partition columns with wildcard + false, // Put partition columns after table columns scanRootDir, + partitionDepth, paths); scanOrchestrator.withMetadata(metadataManager); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java index ae8502b52..a4ace55be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java @@ -46,6 +46,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { private boolean hasImplicitCols; + private boolean expandPartitionsAtEnd; + public FileMetadataColumnsParser(FileMetadataManager metadataManager) { this.metadataManager = metadataManager; partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE); @@ -123,8 +125,10 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { } private void buildWildcard() { - if (metadataManager.useLegacyWildcardExpansion && - metadataManager.useLegacyExpansionLocation) { + if (!metadataManager.useLegacyWildcardExpansion) { + return; + } + if (metadataManager.useLegacyExpansionLocation) { // Star column: this is a SELECT * query. @@ -134,6 +138,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // set is constant across all files. expandPartitions(); + } else { + expandPartitionsAtEnd = true; } } @@ -144,8 +150,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // feature to expand partitions for wildcards, and we want the // partitions after data columns. - if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion && - ! metadataManager.useLegacyExpansionLocation) { + if (expandPartitionsAtEnd) { expandPartitions(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java index ba49a9f54..201d30859 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java @@ -41,24 +41,51 @@ import org.apache.hadoop.fs.Path; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +/** + * Manages the insertion of file metadata (AKA "implicit" and partition) columns. + * Parses the file metadata columns from the projection list. Creates and loads + * the vectors that hold the data. If running in legacy mode, inserts partition + * columns when the query contains a wildcard. Supports renaming the columns via + * session options. + * <p> + * The lifecycle is that the manager is given the set of files for this scan + * operator so it can determine the partition depth. (Note that different scans + * may not agree on the depth. This is a known issue with Drill's implementation.) + * <p> + * Then, at the start of the scan, all projection columns are parsed. This class + * picks out the file metadata columns. + * <p> + * On each file (on each reader), the columns are "resolved." Here, that means + * that the columns are filled in with actual values based on the present file. + * <p> + * This is the successor to {@link ColumnExplorer}. + */ + public class FileMetadataManager implements MetadataManager, SchemaProjectionResolver, VectorSource { + /** + * Automatically compute partition depth from files. Use only + * for testing! + */ + + public static final int AUTO_PARTITION_DEPTH = -1; + // Input - private Path scanRootDir; + private final Path scanRootDir; + private final int partitionCount; private FileMetadata currentFile; // Config protected final String partitionDesignator; - protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); - protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); + protected final List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); + protected final Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); /** * Indicates whether to expand partition columns when the query contains a wildcard. * Supports queries such as the following:<code><pre> - * select * from dfs.`partitioned-dir` - * </pre><code> + * select * from dfs.`partitioned-dir`</pre></code> * In which the output columns will be (columns, dir0) if the partitioned directory * has one level of nesting. * @@ -86,7 +113,6 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes private final List<MetadataColumn> metadataColumns = new ArrayList<>(); private ConstantColumnLoader loader; private VectorContainer outputContainer; - private final int partitionCount; /** * Specifies whether to plan based on the legacy meaning of "*". See @@ -111,10 +137,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes public FileMetadataManager(OptionSet optionManager, boolean useLegacyWildcardExpansion, boolean useLegacyExpansionLocation, - Path rootDir, List<Path> files) { + Path rootDir, + int partitionCount, + List<Path> files) { this.useLegacyWildcardExpansion = useLegacyWildcardExpansion; this.useLegacyExpansionLocation = useLegacyExpansionLocation; - scanRootDir = rootDir; partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); for (ImplicitFileColumns e : ImplicitFileColumns.values()) { @@ -129,28 +156,32 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes // The files and root dir are optional. - if (scanRootDir == null || files == null) { - partitionCount = 0; + if (rootDir == null || files == null) { + scanRootDir = null; + this.partitionCount = 0; // Special case in which the file is the same as the // root directory (occurs for a query with only one file.) - } else if (files.size() == 1 && scanRootDir.equals(files.get(0))) { + } else if (files.size() == 1 && rootDir.equals(files.get(0))) { scanRootDir = null; - partitionCount = 0; + this.partitionCount = 0; } else { + scanRootDir = rootDir; - // Compute the partitions. + // Compute the partitions. Normally the count is passed in. + // But, handle the case where the count is unknown. Note: use this + // convenience only in testing since, in production, it can result + // in different scans reporting different numbers of partitions. - partitionCount = computeMaxPartition(files); + if (partitionCount == -1) { + this.partitionCount = computeMaxPartition(files); + } else { + this.partitionCount = partitionCount; + } } } - public FileMetadataManager(OptionSet optionManager, - Path rootDir, List<Path> files) { - this(optionManager, false, false, rootDir, files); - } - private int computeMaxPartition(List<Path> files) { int maxLen = 0; for (Path filePath : files) { @@ -165,6 +196,17 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes this.vectorCache = vectorCache; } + /** + * Returns the file metadata column parser that: + * <ul> + * <li>Picks out the file metadata and partition columns,</li> + * <li>Inserts partition columns for a wildcard query, if the + * option to do so is set.</li> + * </ul> + * + * @see {@link #useLegacyWildcardExpansion} + */ + @Override public ScanProjectionParser projectionParser() { return parser; } @@ -223,6 +265,10 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes currentFile = null; } + /** + * Resolves metadata columns to concrete, materialized columns with the + * proper value for the present file. + */ @Override public boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple, TupleMetadata tableSchema) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java index a5d6ca2a1..926936550 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java @@ -282,7 +282,9 @@ public class ScanSchemaOrchestrator { ScanProjectionParser parser = metadataManager.projectionParser(); if (parser != null) { - parsers.add(parser); + // Insert in first position so that it is ensured to see + // any wildcard that exists + parsers.add(0, parser); } // Parse the projection list. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java index f109578e9..be45d569e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java @@ -87,17 +87,17 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer { "width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth()); // 1.5 Cap the parallelization width by max allowed parallelization per node - width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode())); + width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode())); - // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more, - // we end up allocating more work units to one or more endpoints that don't have those many work units. + // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if the width is more, + // we end up allocating more work units to one or more endpoints that don't have that many work units. width = Math.min(totalMaxWidth, width); // Step 2: Select the endpoints final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap(); // 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled. - for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) { + for (Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) { endpoints.put(entry.getKey(), 1); } int totalAssigned = endpoints.size(); @@ -105,15 +105,15 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer { // 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint int remainingSlots = width - endpoints.size(); while (remainingSlots > 0) { - for(EndpointAffinity epAf : endpointPool.values()) { + for (EndpointAffinity epAf : endpointPool.values()) { final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots); int currentAssignments = endpoints.get(epAf.getEndpoint()); - for(int i=0; - i < moreAllocation && + for (int i=0; + i < moreAllocation && totalAssigned < width && currentAssignments < parameters.getMaxWidthPerNode() && currentAssignments < epAf.getMaxWidth(); - i++) { + i++) { totalAssigned++; currentAssignments++; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 8de57bca0..39b699fe4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -179,7 +179,7 @@ public class SimpleParallelizer implements ParallelizationParameters { public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) { planningSet.get(rootFragment); - for(ExchangeFragmentPair fragmentPair : rootFragment) { + for (ExchangeFragmentPair fragmentPair : rootFragment) { initFragmentWrappers(fragmentPair.getNode(), planningSet); } } @@ -193,7 +193,7 @@ public class SimpleParallelizer implements ParallelizationParameters { private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. - for(Wrapper currentFragmentWrapper : planningSet) { + for (Wrapper currentFragmentWrapper : planningSet) { ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); if (sendingExchange != null) { ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); @@ -209,17 +209,17 @@ public class SimpleParallelizer implements ParallelizationParameters { // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and - // remove the fragment on which the current fragment depends on. + // remove the fragment on which the current fragment depends. final Set<Wrapper> roots = Sets.newHashSet(); - for(Wrapper w : planningSet) { + for (Wrapper w : planningSet) { roots.add(w); } - for(Wrapper wrapper : planningSet) { + for (Wrapper wrapper : planningSet) { final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { - for(Wrapper dependency : fragmentDependencies) { + for (Wrapper dependency : fragmentDependencies) { if (roots.contains(dependency)) { roots.remove(dependency); } @@ -241,7 +241,7 @@ public class SimpleParallelizer implements ParallelizationParameters { return; } - // First parallelize fragments on which this fragment depends on. + // First parallelize fragments on which this fragment depends. final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { for(Wrapper dependency : fragmentDependencies) { @@ -288,20 +288,20 @@ public class SimpleParallelizer implements ParallelizationParameters { Preconditions.checkArgument(op instanceof FragmentRoot); FragmentRoot root = (FragmentRoot) op; - FragmentHandle handle = FragmentHandle // - .newBuilder() // - .setMajorFragmentId(wrapper.getMajorFragmentId()) // - .setMinorFragmentId(minorFragmentId) // - .setQueryId(queryId) // + FragmentHandle handle = FragmentHandle + .newBuilder() + .setMajorFragmentId(wrapper.getMajorFragmentId()) + .setMinorFragmentId(minorFragmentId) + .setQueryId(queryId) .build(); - PlanFragment fragment = PlanFragment.newBuilder() // - .setForeman(foremanNode) // - .setHandle(handle) // - .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) // - .setLeafFragment(isLeafFragment) // + PlanFragment fragment = PlanFragment.newBuilder() + .setForeman(foremanNode) + .setHandle(handle) + .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) + .setLeafFragment(isLeafFragment) .setContext(queryContextInfo) - .setMemInitial(wrapper.getInitialAllocation())// + .setMemInitial(wrapper.getInitialAllocation()) .setMemMax(wrapper.getMaxAllocation()) .setCredentials(session.getCredentials()) .addAllCollector(CountRequiredFragments.getCollectors(root)) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 760a6ba5b..a07d3ed58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -111,7 +111,7 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA final ScanStats stats = this.getGroupScan().getScanStats(settings); final int columnCount = this.getRowType().getFieldCount(); - if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { + if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) { return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index b4fada61f..4d702a830 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -30,8 +30,8 @@ public class ScanPrule extends Prule{ public ScanPrule() { super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule"); - } + @Override public void onMatch(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(0); @@ -48,5 +48,4 @@ public class ScanPrule extends Prule{ call.transformTo(newScan); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java index fa8e69d0b..267cecd4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java @@ -159,13 +159,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive class MajorFragmentStat { private DistributionAffinity distributionAffinity = DistributionAffinity.NONE; - private double maxRows = 0d; + private double maxRows; private int maxWidth = Integer.MAX_VALUE; - private boolean isMultiSubScan = false; - private boolean rightSideOfLateral = false; + private boolean isMultiSubScan; + private boolean rightSideOfLateral; //This flag if true signifies that all the Rels thus far //are simple rels with no distribution requirement. - private boolean isSimpleRel = false; + private boolean isSimpleRel; public void add(Prel prel) { maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows); @@ -196,7 +196,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive return false; } - int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize); + int suggestedWidth = (int) Math.ceil((maxRows + 1) / targetSliceSize); int w = Math.min(maxWidth, suggestedWidth); if (w < 1) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 9cd670e99..e201e2686 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -242,6 +242,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR), new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR), new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER), + new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER), + new OptionDefinition(ExecConstants.MIN_READER_WIDTH), new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST), new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE), new OptionDefinition(ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index 33b500018..1ae2d5e9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -55,14 +55,30 @@ public class ColumnExplorer { */ public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) { this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); - this.columns = columns; - this.isStarQuery = columns != null && Utilities.isStarQuery(columns); this.selectedPartitionColumns = Lists.newArrayList(); this.tableColumns = Lists.newArrayList(); this.allImplicitColumns = initImplicitFileColumns(optionManager); this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap(); + if (columns == null) { + isStarQuery = false; + this.columns = null; + } else { + this.columns = columns; + this.isStarQuery = Utilities.isStarQuery(columns); + init(); + } + } - init(); + /** + * Constructor for using the column explorer to probe existing columns in the + * {@link ProjectRecordBatch}. + */ + // TODO: This is awkward. This class is being used for two distinct things: + // 1. The definition of the metadata columns, and + // 2. The projection of metadata columns in a particular query. + // Would be better to separate these two concepts. + public ColumnExplorer(OptionManager optionManager) { + this(optionManager, null); } /** @@ -123,6 +139,15 @@ public class ColumnExplorer { return matcher.matches(); } + public boolean isImplicitColumn(String name) { + return isPartitionColumn(partitionDesignator, name) || + isImplicitFileColumn(name); + } + + public boolean isImplicitFileColumn(String name) { + return allImplicitColumns.get(name) != null; + } + /** * Returns list with partition column names. * For the case when table has several levels of nesting, max level is chosen. @@ -132,10 +157,24 @@ public class ColumnExplorer { * @return list with partition column names. */ public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) { - int partitionsCount = 0; + int partitionsCount = getPartitionDepth(selection); + + String partitionColumnLabel = schemaConfig.getOption( + ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + List<String> partitions = new ArrayList<>(); + + // generates partition column names: dir0, dir1 etc. + for (int i = 0; i < partitionsCount; i++) { + partitions.add(partitionColumnLabel + i); + } + return partitions; + } + + public static int getPartitionDepth(FileSelection selection) { // a depth of table root path int rootDepth = selection.getSelectionRoot().depth(); + int partitionsCount = 0; for (Path file : selection.getFiles()) { // Calculates partitions count for the concrete file: // depth of file path - depth of table root path - 1. @@ -145,15 +184,7 @@ public class ColumnExplorer { // max depth of files path should be used to handle all partitions partitionsCount = Math.max(partitionsCount, currentPartitionsCount); } - - String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List<String> partitions = new ArrayList<>(); - - // generates partition column names: dir0, dir1 etc. - for (int i = 0; i < partitionsCount; i++) { - partitions.add(partitionColumnLabel + i); - } - return partitions; + return partitionsCount; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 13017fadc..254cddbe3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -73,5 +73,4 @@ public interface FormatPlugin { Configuration getFsConf(); DrillbitContext getContext(); String getName(); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index d76c6489e..dc1f053a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.drill.shaded.guava.com.google.common.base.Functions; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; @@ -39,10 +40,16 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch; +import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; +import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec; +import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; @@ -55,67 +62,335 @@ import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +/** + * Base class for various file readers. + * <p> + * This version provides a bridge between the legacy {@link RecordReader}-style + * readers and the newer {@link FileBatchReader} style. Over time, split the + * class, or provide a cleaner way to handle the differences. + * + * @param <T> the format plugin config for this reader + */ + public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin { - @SuppressWarnings("unused") - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); + /** + * Defines the static, programmer-defined options for this plugin. These + * options are attributes of how the plugin works. The plugin config, + * defined in the class definition, provides user-defined options that can + * vary across uses of the plugin. + */ + + public static class EasyFormatConfig { + public BasicFormatMatcher matcher; + public boolean readable = true; + public boolean writable; + public boolean blockSplittable; + public boolean compressible; + public Configuration fsConf; + public List<String> extensions; + public String defaultName; + + // Config options that, prior to Drill 1.15, required the plugin to + // override methods. Moving forward, plugins should be migrated to + // use this simpler form. New plugins should use these options + // instead of overriding methods. + + public boolean supportsProjectPushdown; + public boolean supportsAutoPartitioning; + public int readerOperatorType = -1; + public int writerOperatorType = -1; + } + + /** + * Creates the scan batch to use with the plugin. Drill supports the "classic" + * style of scan batch and readers, along with the newer size-aware, + * component-based version. The implementation of this class assembles the + * readers and scan batch operator as needed for each version. + */ + + public interface ScanBatchCreator { + CloseableRecordBatch buildScan( + final FragmentContext context, EasySubScan scan) + throws ExecutionSetupException; + } + + /** + * Use the original scanner based on the {@link RecordReader} interface. + * Requires that the storage plugin roll its own solutions for null columns. + * Is not able to limit vector or batch sizes. Retained or backward + * compatibility with "classic" format plugins which have not yet been + * upgraded to use the new framework. + */ + + public static class ClassicScanBatchCreator implements ScanBatchCreator { + + private final EasyFormatPlugin<? extends FormatPluginConfig> plugin; + + public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) { + this.plugin = plugin; + } + + @Override + public CloseableRecordBatch buildScan( + final FragmentContext context, EasySubScan scan) throws ExecutionSetupException { + final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); + + if (! columnExplorer.isStarQuery()) { + scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), + columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth()); + scan.setOperatorId(scan.getOperatorId()); + } + + final OperatorContext oContext = context.newOperatorContext(scan); + final DrillFileSystem dfs; + try { + dfs = oContext.newFileSystem(plugin.easyConfig().fsConf); + } catch (final IOException e) { + throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); + } + + final List<RecordReader> readers = new LinkedList<>(); + final List<Map<String, String>> implicitColumns = Lists.newArrayList(); + Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); + final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; + for (final FileWork work : scan.getWorkUnits()) { + final RecordReader recordReader = getRecordReader( + plugin, context, dfs, work, scan.getColumns(), scan.getUserName()); + readers.add(recordReader); + final List<String> partitionValues = ColumnExplorer.listPartitionValues( + work.getPath(), scan.getSelectionRoot(), false); + final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns( + work.getPath(), partitionValues, supportsFileImplicitColumns); + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; + } + } + + // all readers should have the same number of implicit columns, add missing ones with value null + final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (final Map<String, String> map : implicitColumns) { + map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); + } + + return new ScanBatch(context, oContext, readers, implicitColumns); + } + + /** + * Create a record reader given a file system, a file description and other + * information. For backward compatibility, calls the plugin method by + * default. + * + * @param plugin + * the plugin creating the scan + * @param context + * fragment context for the fragment running the scan + * @param dfs + * Drill's distributed file system facade + * @param fileWork + * description of the file to scan + * @param columns + * list of columns to project + * @param userName + * the name of the user performing the scan + * @return a scan operator + * @throws ExecutionSetupException + * if anything goes wrong + */ + + public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin, + FragmentContext context, DrillFileSystem dfs, FileWork fileWork, + List<SchemaPath> columns, String userName) throws ExecutionSetupException { + return plugin.getRecordReader(context, dfs, fileWork, columns, userName); + } + } + + /** + * Revised scanner based on the revised + * {@link ResultSetLoader} and {@link RowBatchReader} classes. + * Handles most projection tasks automatically. Able to limit + * vector and batch sizes. Use this for new format plugins. + */ + + public abstract static class ScanFrameworkCreator + implements ScanBatchCreator { + + protected EasyFormatPlugin<? extends FormatPluginConfig> plugin; + + public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) { + this.plugin = plugin; + } + + /** + * Builds the revised {@link FileBatchReader}-based scan batch. + * + * @param context + * @param scan + * @return + * @throws ExecutionSetupException + */ + + @Override + public CloseableRecordBatch buildScan( + final FragmentContext context, + final EasySubScan scan) throws ExecutionSetupException { + + // Assemble the scan operator and its wrapper. + + try { + final BaseFileScanFramework<?> framework = buildFramework(scan); + final Path selectionRoot = scan.getSelectionRoot(); + if (selectionRoot != null) { + framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth()); + } + return new OperatorRecordBatch( + context, scan, + new ScanOperatorExec( + framework)); + } catch (final UserException e) { + // Rethrow user exceptions directly + throw e; + } catch (final Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + + /** + * Create the plugin-specific framework that manages the scan. The framework + * creates batch readers one by one for each file or block. It defines semantic + * rules for projection. It handles "early" or "late" schema readers. A typical + * framework builds on standardized frameworks for files in general or text + * files in particular. + * + * @param scan the physical operation definition for the scan operation. Contains + * one or more files to read. (The Easy format plugin works only for files.) + * @return the scan framework which orchestrates the scan operation across + * potentially many files + * @throws ExecutionSetupException for all setup failures + */ + protected abstract BaseFileScanFramework<?> buildFramework( + EasySubScan scan) throws ExecutionSetupException; + } + + /** + * Generic framework creator for files that just use the basic file + * support: metadata, etc. Specialized use cases (special "columns" + * column, say) will require a specialized implementation. + */ + + public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator { + + private final FileReaderFactory readerCreator; + + public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin, + FileReaderFactory readerCreator) { + super(plugin); + this.readerCreator = readerCreator; + } + + @Override + protected FileScanFramework buildFramework( + EasySubScan scan) throws ExecutionSetupException { - private final BasicFormatMatcher matcher; + final FileScanFramework framework = new FileScanFramework( + scan.getColumns(), + scan.getWorkUnits(), + plugin.easyConfig().fsConf, + readerCreator); + return framework; + } + } + + private final String name; + private final EasyFormatConfig easyConfig; private final DrillbitContext context; - private final boolean readable; - private final boolean writable; - private final boolean blockSplittable; - private final Configuration fsConf; private final StoragePluginConfig storageConfig; protected final T formatConfig; - private final String name; - private final boolean compressible; + /** + * Legacy constructor. + */ protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf, - StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable, - boolean compressible, List<String> extensions, String defaultName){ - this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible); - this.readable = readable; - this.writable = writable; + StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, + boolean blockSplittable, + boolean compressible, List<String> extensions, String defaultName) { + this.name = name == null ? defaultName : name; + easyConfig = new EasyFormatConfig(); + easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible); + easyConfig.readable = readable; + easyConfig.writable = writable; this.context = context; - this.blockSplittable = blockSplittable; - this.compressible = compressible; - this.fsConf = fsConf; + easyConfig.blockSplittable = blockSplittable; + easyConfig.compressible = compressible; + easyConfig.fsConf = fsConf; this.storageConfig = storageConfig; this.formatConfig = formatConfig; - this.name = name == null ? defaultName : name; } - @Override - public Configuration getFsConf() { - return fsConf; + /** + * Revised constructor in which settings are gathered into a configuration object. + * + * @param name name of the plugin + * @param config configuration options for this plugin which determine + * developer-defined runtime behavior + * @param context the global server-wide drillbit context + * @param storageConfig the configuration for the storage plugin that owns this + * format plugin + * @param formatConfig the Jackson-serialized format configuration as created + * by the user in the Drill web console. Holds user-defined options. + */ + + protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context, + StoragePluginConfig storageConfig, T formatConfig) { + this.name = name; + this.easyConfig = config; + this.context = context; + this.storageConfig = storageConfig; + this.formatConfig = formatConfig; + if (easyConfig.matcher == null) { + easyConfig.matcher = new BasicFormatMatcher(this, + easyConfig.fsConf, easyConfig.extensions, + easyConfig.compressible); + } } @Override - public DrillbitContext getContext() { - return context; - } + public Configuration getFsConf() { return easyConfig.fsConf; } @Override - public String getName() { - return name; - } + public DrillbitContext getContext() { return context; } - public abstract boolean supportsPushDown(); + public EasyFormatConfig easyConfig() { return easyConfig; } + + @Override + public String getName() { return name; } /** - * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will - * only split on file boundaries. + * Does this plugin support projection push down? That is, can the reader + * itself handle the tasks of projecting table columns, creating null + * columns for missing table columns, and so on? * - * @return True if splittable. + * @return <tt>true</tt> if the plugin supports projection push-down, + * <tt>false</tt> if Drill should do the task by adding a project operator */ - public boolean isBlockSplittable() { - return blockSplittable; - } + + public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; } + + /** + * Whether or not you can split the format based on blocks within file + * boundaries. If not, the simple format engine will only split on file + * boundaries. + * + * @return <code>true</code> if splittable. + */ + public boolean isBlockSplittable() { return easyConfig.blockSplittable; } /** * Indicates whether or not this format could also be in a compression @@ -125,52 +400,40 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements * * @return <code>true</code> if it is compressible */ - public boolean isCompressible() { - return compressible; - } - - public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List<SchemaPath> columns, String userName) throws ExecutionSetupException; + public boolean isCompressible() { return easyConfig.compressible; } - CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); - - if (!columnExplorer.isStarQuery()) { - scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), - columnExplorer.getTableColumns(), scan.getSelectionRoot()); - scan.setOperatorId(scan.getOperatorId()); - } + /** + * Return a record reader for the specific file format, when using the original + * {@link ScanBatch} scanner. + * @param context fragment context + * @param dfs Drill file system + * @param fileWork metadata about the file to be scanned + * @param columns list of projected columns (or may just contain the wildcard) + * @param userName the name of the user running the query + * @return a record reader for this format + * @throws ExecutionSetupException for many reasons + */ - OperatorContext oContext = context.newOperatorContext(scan); - final DrillFileSystem dfs; - try { - dfs = oContext.newFileSystem(fsConf); - } catch (IOException e) { - throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); - } + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, + List<SchemaPath> columns, String userName) throws ExecutionSetupException { + throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner."); + } - List<RecordReader> readers = new LinkedList<>(); - List<Map<String, String>> implicitColumns = Lists.newArrayList(); - Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); - boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; - for (FileWork work : scan.getWorkUnits()){ - RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); - readers.add(recordReader); - List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false); - Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns); - implicitColumns.add(implicitValues); - if (implicitValues.size() > mapWithMaxColumns.size()) { - mapWithMaxColumns = implicitValues; - } - } + protected CloseableRecordBatch getReaderBatch(final FragmentContext context, + final EasySubScan scan) throws ExecutionSetupException { + return scanBatchCreator(context.getOptions()).buildScan(context, scan); + } - // all readers should have the same number of implicit columns, add missing ones with value null - Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); - for (Map<String, String> map : implicitColumns) { - map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); - } + /** + * Create the scan batch creator. Needed only when using the revised scan batch. In that + * case, override the <tt>readerIterator()</tt> method on the custom scan batch + * creator implementation. + * + * @return the strategy for creating the scan batch for this plugin + */ - return new ScanBatch(context, oContext, readers, implicitColumns); + protected ScanBatchCreator scanBatchCreator(OptionManager options) { + return new ClassicScanBatchCreator(this); } public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) { @@ -219,41 +482,28 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } @Override - public T getConfig() { - return formatConfig; - } + public T getConfig() { return formatConfig; } @Override - public StoragePluginConfig getStorageConfig() { - return storageConfig; - } + public StoragePluginConfig getStorageConfig() { return storageConfig; } @Override - public boolean supportsRead() { - return readable; - } + public boolean supportsRead() { return easyConfig.readable; } @Override - public boolean supportsWrite() { - return writable; - } + public boolean supportsWrite() { return easyConfig.writable; } @Override - public boolean supportsAutoPartitioning() { - return false; - } + public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; } @Override - public FormatMatcher getMatcher() { - return matcher; - } + public FormatMatcher getMatcher() { return easyConfig.matcher; } @Override public Set<StoragePluginOptimizerRule> getOptimizerRules() { return ImmutableSet.of(); } - public abstract int getReaderOperatorType(); - public abstract int getWriterOperatorType(); - + public int getReaderOperatorType() { return easyConfig.readerOperatorType; } + public int getWriterOperatorType() { return easyConfig.writerOperatorType; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 4449ec054..6a6243cd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; @@ -57,9 +58,11 @@ import org.apache.hadoop.fs.Path; public class EasyGroupScan extends AbstractFileGroupScan { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class); - private FileSelection selection; private final EasyFormatPlugin<?> formatPlugin; + private FileSelection selection; + private int partitionDepth; private int maxWidth; + private int minWidth = 1; private List<SchemaPath> columns; private ListMultimap<Integer, CompleteFileWork> mappings; @@ -104,9 +107,23 @@ public class EasyGroupScan extends AbstractFileGroupScan { initFromSelection(selection, formatPlugin); } - @JsonIgnore - public Iterable<CompleteFileWork> getWorkIterable() { - return () -> Iterators.unmodifiableIterator(chunks.iterator()); + public EasyGroupScan( + String userName, + FileSelection selection, + EasyFormatPlugin<?> formatPlugin, + List<SchemaPath> columns, + Path selectionRoot, + int minWidth + ) throws IOException{ + this(userName, selection, formatPlugin, columns, selectionRoot); + + // Set the minimum width of this reader. Primarily used for testing + // to force parallelism even for small test files. + // See ExecConstants.MIN_READER_WIDTH + this.minWidth = Math.max(1, Math.min(minWidth, maxWidth)); + + // Compute the maximum partition depth across all files. + partitionDepth = ColumnExplorer.getPartitionDepth(selection); } private EasyGroupScan(final EasyGroupScan that) { @@ -118,17 +135,23 @@ public class EasyGroupScan extends AbstractFileGroupScan { chunks = that.chunks; endpointAffinities = that.endpointAffinities; maxWidth = that.maxWidth; + minWidth = that.minWidth; mappings = that.mappings; + partitionDepth = that.partitionDepth; + } + + @JsonIgnore + public Iterable<CompleteFileWork> getWorkIterable() { + return () -> Iterators.unmodifiableIterator(chunks.iterator()); } private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException { - @SuppressWarnings("resource") final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf()); this.selection = selection; BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable()); - this.maxWidth = chunks.size(); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable()); + maxWidth = chunks.size(); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); } public Path getSelectionRoot() { @@ -136,11 +159,16 @@ public class EasyGroupScan extends AbstractFileGroupScan { } @Override + @JsonIgnore + public int getMinParallelizationWidth() { + return minWidth; + } + + @Override public int getMaxParallelizationWidth() { return maxWidth; } - @Override public ScanStats getScanStats(final PlannerSettings settings) { return formatPlugin.getScanStats(settings, this); @@ -163,7 +191,6 @@ public class EasyGroupScan extends AbstractFileGroupScan { return columns; } - @JsonIgnore public FileSelection getFileSelection() { return selection; @@ -180,7 +207,6 @@ public class EasyGroupScan extends AbstractFileGroupScan { return new EasyGroupScan(this); } - @Override public List<EndpointAffinity> getOperatorAffinity() { if (endpointAffinities == null) { @@ -217,7 +243,8 @@ public class EasyGroupScan extends AbstractFileGroupScan { Preconditions.checkArgument(!filesForMinor.isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); - EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot); + EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, + columns, selectionRoot, partitionDepth); subScan.setOperatorId(this.getOperatorId()); return subScan; } @@ -275,5 +302,4 @@ public class EasyGroupScan extends AbstractFileGroupScan { public boolean canPushdownProjects(List<SchemaPath> columns) { return formatPlugin.supportsPushDown(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java index fbb3f475c..c51c7ac0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java @@ -42,7 +42,8 @@ public class EasySubScan extends AbstractSubScan{ private final List<FileWorkImpl> files; private final EasyFormatPlugin<?> formatPlugin; private final List<SchemaPath> columns; - private Path selectionRoot; + private final Path selectionRoot; + private final int partitionDepth; @JsonCreator public EasySubScan( @@ -52,7 +53,8 @@ public class EasySubScan extends AbstractSubScan{ @JsonProperty("format") FormatPluginConfig formatConfig, @JacksonInject StoragePluginRegistry engineRegistry, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("selectionRoot") Path selectionRoot + @JsonProperty("selectionRoot") Path selectionRoot, + @JsonProperty("partitionDepth") int partitionDepth ) throws ExecutionSetupException { super(userName); this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); @@ -60,50 +62,40 @@ public class EasySubScan extends AbstractSubScan{ this.files = files; this.columns = columns; this.selectionRoot = selectionRoot; + this.partitionDepth = partitionDepth; } - public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, - Path selectionRoot){ + public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, + List<SchemaPath> columns, Path selectionRoot, int partitionDepth) { super(userName); this.formatPlugin = plugin; this.files = files; this.columns = columns; this.selectionRoot = selectionRoot; + this.partitionDepth = partitionDepth; } @JsonProperty - public Path getSelectionRoot() { - return selectionRoot; - } + public Path getSelectionRoot() { return selectionRoot; } + + @JsonProperty + public int getPartitionDepth() { return partitionDepth; } @JsonIgnore - public EasyFormatPlugin<?> getFormatPlugin(){ - return formatPlugin; - } + public EasyFormatPlugin<?> getFormatPlugin() { return formatPlugin; } @JsonProperty("files") - public List<FileWorkImpl> getWorkUnits() { - return files; - } + public List<FileWorkImpl> getWorkUnits() { return files; } @JsonProperty("storage") - public StoragePluginConfig getStorageConfig(){ - return formatPlugin.getStorageConfig(); - } + public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); } @JsonProperty("format") - public FormatPluginConfig getFormatConfig(){ - return formatPlugin.getConfig(); - } + public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); } @JsonProperty("columns") - public List<SchemaPath> getColumns(){ - return columns; - } + public List<SchemaPath> getColumns() { return columns; } @Override - public int getOperatorType() { - return formatPlugin.getReaderOperatorType(); - } - + public int getOperatorType() { return formatPlugin.getReaderOperatorType(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 03ae6f554..05ed1b564 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -18,34 +18,45 @@ package org.apache.drill.exec.store.easy.text; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.FileReaderCreator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader; import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings; +import org.apache.drill.exec.store.easy.text.compliant.v3.CompliantTextBatchReader; +import org.apache.drill.exec.store.easy.text.compliant.v3.TextParsingSettingsV3; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.drill.exec.store.text.DrillTextRecordReader; import org.apache.drill.exec.store.text.DrillTextRecordWriter; @@ -61,91 +72,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> { - private final static String DEFAULT_NAME = "text"; +public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> + implements FileReaderCreator { + private final static String PLUGIN_NAME = "text"; - public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { - super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true, - Collections.emptyList(), DEFAULT_NAME); - } - - public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, - TextFormatConfig formatPluginConfig) { - super(name, context, fsConf, config, formatPluginConfig, true, false, true, true, - formatPluginConfig.getExtensions(), DEFAULT_NAME); - } - - - @Override - public RecordReader getRecordReader(FragmentContext context, - DrillFileSystem dfs, - FileWork fileWork, - List<SchemaPath> columns, - String userName) { - Path path = dfs.makeQualified(fileWork.getPath()); - FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); - - if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { - TextParsingSettings settings = new TextParsingSettings(); - settings.set(formatConfig); - return new CompliantTextRecordReader(split, dfs, settings, columns); - } else { - char delim = formatConfig.getFieldDelimiter(); - return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns); - } - } - - @Override - public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) - throws IOException { - return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot); - } - - @Override - public boolean supportsStatistics() { - return false; - } - - @Override - public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { - throw new UnsupportedOperationException("unimplemented"); - } - - @Override - public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { - throw new UnsupportedOperationException("unimplemented"); - } - - @Override - protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) { - long data = 0; - for (final CompleteFileWork work : scan.getWorkIterable()) { - data += work.getTotalBytes(); - } - final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE); - final double estRowCount = data / estimatedRowSize; - return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data); - } - - @Override - public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { - final Map<String, String> options = new HashMap<>(); - - options.put("location", writer.getLocation()); - FragmentHandle handle = context.getHandle(); - String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); - options.put("prefix", fragmentId); - options.put("separator", getConfig().getFieldDelimiterAsString()); - options.put("extension", getConfig().getExtensions().get(0)); - - RecordWriter recordWriter = new DrillTextRecordWriter( - context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); - recordWriter.init(options); - - return recordWriter; - } - - @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT) + @JsonTypeName(PLUGIN_NAME) + @JsonInclude(Include.NON_DEFAULT) public static class TextFormatConfig implements FormatPluginConfig { public List<String> extensions = ImmutableList.of(); @@ -157,34 +89,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm public boolean skipFirstLine = false; public boolean extractHeader = false; - public List<String> getExtensions() { - return extensions; - } - - public char getQuote() { - return quote; - } - - public char getEscape() { - return escape; - } + public TextFormatConfig() { } - public char getComment() { - return comment; - } - - public String getLineDelimiter() { - return lineDelimiter; - } - - public char getFieldDelimiter() { - return fieldDelimiter; - } + public List<String> getExtensions() { return extensions; } + public char getQuote() { return quote; } + public char getEscape() { return escape; } + public char getComment() { return comment; } + public String getLineDelimiter() { return lineDelimiter; } + public char getFieldDelimiter() { return fieldDelimiter; } + public boolean isSkipFirstLine() { return skipFirstLine; } @JsonIgnore - public boolean isHeaderExtractionEnabled() { - return extractHeader; - } + public boolean isHeaderExtractionEnabled() { return extractHeader; } @JsonIgnore public String getFieldDelimiterAsString(){ @@ -197,10 +113,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm this.fieldDelimiter = delimiter; } - public boolean isSkipFirstLine() { - return skipFirstLine; - } - @Override public int hashCode() { final int prime = 31; @@ -262,24 +174,173 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm } return true; } + } + + public static class TextScanBatchCreator extends ScanFrameworkCreator { + + private final FileReaderCreator readerCreator; + private final TextFormatPlugin textPlugin; + + public TextScanBatchCreator(TextFormatPlugin plugin, + FileReaderCreator readerCreator) { + super(plugin); + this.readerCreator = readerCreator; + textPlugin = plugin; + } + + @Override + protected ColumnsScanFramework buildFramework( + EasySubScan scan) throws ExecutionSetupException { + ColumnsScanFramework framework = new ColumnsScanFramework( + scan.getColumns(), + scan.getWorkUnits(), + plugin.easyConfig().fsConf, + readerCreator); + + // If this format has no headers, or wants to skip them, + // then we must use the columns column to hold the data. + + framework.requireColumnsArray( + ! textPlugin.getConfig().isHeaderExtractionEnabled()); + + // Text files handle nulls in an unusual way. Missing columns + // are set to required Varchar and filled with blanks. Yes, this + // means that the SQL statement or code cannot differentiate missing + // columns from empty columns, but that is how CSV and other text + // files have been defined within Drill. + + framework.setNullType( + MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REQUIRED) + .build()); + + return framework; + } + } + + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { + this(name, context, fsConf, storageConfig, new TextFormatConfig()); + } + + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, + TextFormatConfig formatPluginConfig) { + super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig); + } + + private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfig pluginConfig) { + EasyFormatConfig config = new EasyFormatConfig(); + config.readable = true; + config.writable = true; + config.blockSplittable = true; + config.compressible = true; + config.supportsProjectPushdown = true; + config.extensions = pluginConfig.getExtensions(); + config.fsConf = fsConf; + config.defaultName = PLUGIN_NAME; + config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE; + config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE; + return config; + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) + throws IOException { + return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns, OptionManager options) throws IOException { + return new EasyGroupScan(userName, selection, this, columns, + selection.selectionRoot, + // Some paths provide a null option manager. In that case, default to a + // min width of 1; just like the base class. + options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY)); + } + + @Override + protected ScanBatchCreator scanBatchCreator(OptionManager options) { + // Create the "legacy", "V2" reader or the new "V3" version based on + // the result set loader. This code should be temporary: the two + // readers provide identical functionality for the user; only the + // internals differ. + if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) { + return new TextScanBatchCreator(this, this); + } else { + return new ClassicScanBatchCreator(this); + } + } + + // TODO: Remove this once the V2 reader is removed. + @Override + public RecordReader getRecordReader(FragmentContext context, + DrillFileSystem dfs, + FileWork fileWork, + List<SchemaPath> columns, + String userName) { + Path path = dfs.makeQualified(fileWork.getPath()); + FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); + + if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { + TextParsingSettings settings = new TextParsingSettings(); + settings.set(formatConfig); + return new CompliantTextRecordReader(split, dfs, settings, columns); + } else { + char delim = formatConfig.getFieldDelimiter(); + return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns); + } + } + @Override + public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { + final Map<String, String> options = new HashMap<>(); + options.put("location", writer.getLocation()); + FragmentHandle handle = context.getHandle(); + String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); + options.put("prefix", fragmentId); + options.put("separator", getConfig().getFieldDelimiterAsString()); + options.put("extension", getConfig().getExtensions().get(0)); + RecordWriter recordWriter = new DrillTextRecordWriter( + context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); + recordWriter.init(options); + + return recordWriter; } @Override - public int getReaderOperatorType() { - return CoreOperatorType.TEXT_SUB_SCAN_VALUE; + public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader( + DrillFileSystem dfs, + FileSplit split) throws ExecutionSetupException { + TextParsingSettingsV3 settings = new TextParsingSettingsV3(); + settings.set(getConfig()); + return new CompliantTextBatchReader(split, dfs, settings); + } + @Override + public boolean supportsStatistics() { + return false; } @Override - public int getWriterOperatorType() { - return CoreOperatorType.TEXT_WRITER_VALUE; + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); } @Override - public boolean supportsPushDown() { - return true; + public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); } + @Override + protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) { + long data = 0; + for (final CompleteFileWork work : scan.getWorkIterable()) { + data += work.getTotalBytes(); + } + final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE); + final double estRowCount = data / estimatedRowSize; + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java new file mode 100644 index 000000000..0ed3155a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Original version of the "compliant" text reader. This is version 2 of + * the text reader. This version is retained for temporary backward + * compatibility as we productize the newer version 3 based on the + * row set framework. + * <p> + * TODO: Remove the files in this package and move the files from the + * "v3" sub-package here once the version 3 implementation stabilizes. + */ +package org.apache.drill.exec.store.easy.text.compliant; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java new file mode 100644 index 000000000..6bf0bb69c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; + +public abstract class BaseFieldOutput extends TextOutput { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + private static final int MAX_FIELD_LENGTH = 1024 * 64; + + // track which field is getting appended + protected int currentFieldIndex = -1; + // track chars within field + protected int currentDataPointer; + // track if field is still getting appended + private boolean fieldOpen = true; + // holds chars for a field + protected byte[] fieldBytes; + protected final RowSetLoader writer; + private final boolean[] projectionMask; + protected final int maxField; + protected boolean fieldProjected; + + /** + * Initialize the field output for one of three scenarios: + * <ul> + * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1 + * max fields.</li> + * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field + * of -1.</li> + * <li>SELECT a, b, c indicated by a non-null projection mask that + * identifies the indexes of the fields to be selected. In this case, + * this constructor computes the maximum field.</li> + * </ul> + * + * @param writer Row set writer that provides access to the writer for + * each column + * @param maxField the index of the last field to store. May be -1 if no + * fields are to be stored. Computed if the projection mask is set + * @param projectionMask a boolean array indicating which fields are + * to be projected to the output. Optional + */ + + public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) { + this.writer = writer; + this.projectionMask = projectionMask; + + // If no projection mask is defined, then we want all columns + // up to the max field, which may be -1 if we want to select + // nothing. + + if (projectionMask == null) { + this.maxField = maxField; + } else { + + // Otherwise, use the projection mask to determine + // which fields are to be projected. (The file may well + // contain more than the projected set.) + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + this.maxField = end; + } + + // If we project at least one field, allocate a buffer. + + if (maxField >= 0) { + fieldBytes = new byte[MAX_FIELD_LENGTH]; + } + } + + /** + * Start a new record record. Resets all pointers + */ + + @Override + public void startRecord() { + currentFieldIndex = -1; + fieldOpen = false; + writer.start(); + } + + @Override + public void startField(int index) { + assert index == currentFieldIndex + 1; + currentFieldIndex = index; + currentDataPointer = 0; + fieldOpen = true; + + // Figure out if this field is projected. + + if (projectionMask == null) { + fieldProjected = currentFieldIndex <= maxField; + } else if (currentFieldIndex >= projectionMask.length) { + fieldProjected = false; + } else { + fieldProjected = projectionMask[currentFieldIndex]; + } + } + + @Override + public void append(byte data) { + if (! fieldProjected) { + return; + } + if (currentDataPointer >= MAX_FIELD_LENGTH - 1) { + throw UserException + .unsupportedError() + .message("Text column is too large.") + .addContext("Column", currentFieldIndex) + .addContext("Limit", MAX_FIELD_LENGTH) + .build(logger); + } + + fieldBytes[currentDataPointer++] = data; + } + + @Override + public boolean endField() { + fieldOpen = false; + return currentFieldIndex < maxField; + } + + @Override + public boolean endEmptyField() { + return endField(); + } + + @Override + public void finishRecord() { + if (fieldOpen) { + endField(); + } + writer.save(); + } + + @Override + public long getRecordCount() { + return writer.rowCount(); + } + + @Override + public boolean isFull() { + return writer.isFull(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java new file mode 100644 index 000000000..e489003c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.hadoop.mapred.FileSplit; + +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; + +/** + * New text reader, complies with the RFC 4180 standard for text/csv files + */ +public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class); + + private static final int MAX_RECORDS_PER_BATCH = 8096; + private static final int READ_BUFFER = 1024 * 1024; + private static final int WHITE_SPACE_BUFFER = 64 * 1024; + + // settings to be used while parsing + private final TextParsingSettingsV3 settings; + // Chunk of the file to be read by this reader + private final FileSplit split; + // text reader implementation + private TextReader reader; + // input buffer + private DrillBuf readBuffer; + // working buffer to handle whitespaces + private DrillBuf whitespaceBuffer; + private final DrillFileSystem dfs; + + private RowSetLoader writer; + + public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) { + this.split = split; + this.settings = settings; + this.dfs = dfs; + + // Validate. Otherwise, these problems show up later as a data + // read error which is very confusing. + + if (settings.getNewLineDelimiter().length == 0) { + throw UserException + .validationError() + .message("The text format line delimiter cannot be blank.") + .build(logger); + } + } + + /** + * Performs the initial setup required for the record reader. + * Initializes the input stream, handling of the output record batch + * and the actual reader to be used. + * @param context operator context from which buffer's will be allocated and managed + * @param outputMutator Used to create the schema in the output record batch + */ + + @Override + public boolean open(ColumnsSchemaNegotiator schemaNegotiator) { + final OperatorContext context = schemaNegotiator.context(); + + // Note: DO NOT use managed buffers here. They remain in existence + // until the fragment is shut down. The buffers here are large. + // If we scan 1000 files, and allocate 1 MB for each, we end up + // holding onto 1 GB of memory in managed buffers. + // Instead, we allocate the buffers explicitly, and must free + // them. + + readBuffer = context.getAllocator().buffer(READ_BUFFER); + whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER); + + // TODO: Set this based on size of record rather than + // absolute count. + + schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH); + + // setup Output, Input, and Reader + try { + TextOutput output; + + if (settings.isHeaderExtractionEnabled()) { + output = openWithHeaders(schemaNegotiator); + } else { + output = openWithoutHeaders(schemaNegotiator); + } + if (output == null) { + return false; + } + openReader(output); + return true; + } catch (final IOException e) { + throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger); + } + } + + /** + * Extract header and use that to define the reader schema. + */ + + private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException { + final String [] fieldNames = extractHeader(); + if (fieldNames == null) { + return null; + } + final TupleMetadata schema = new TupleSchema(); + for (final String colName : fieldNames) { + schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED)); + } + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new FieldVarCharOutput(writer); + } + + /** + * When no headers, create a single array column "columns". + */ + + private TextOutput openWithoutHeaders( + ColumnsSchemaNegotiator schemaNegotiator) { + final TupleMetadata schema = new TupleSchema(); + schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED)); + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes()); + } + + private void openReader(TextOutput output) throws IOException { + logger.trace("Opening file {}", split.getPath()); + final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput input = new TextInput(settings, stream, readBuffer, + split.getStart(), split.getStart() + split.getLength()); + + // setup Reader using Input and Output + reader = new TextReader(settings, input, output, whitespaceBuffer); + reader.start(); + } + + /** + * Extracts header from text file. + * Currently it is assumed to be first line if headerExtractionEnabled is set to true + * TODO: enhance to support more common header patterns + * @return field name strings + */ + + private String [] extractHeader() throws IOException { + assert settings.isHeaderExtractionEnabled(); + + // don't skip header in case skipFirstLine is set true + settings.setSkipFirstLine(false); + + final HeaderBuilder hOutput = new HeaderBuilder(split.getPath()); + + // setup Input using InputStream + // we should read file header irrespective of split given given to this reader + final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength()); + + // setup Reader using Input and Output + this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer); + reader.start(); + + // extract first row only + reader.parseNext(); + + // grab the field names from output + final String [] fieldNames = hOutput.getHeaders(); + + // cleanup and set to skip the first line next time we read input + reader.close(); + settings.setSkipFirstLine(true); + + readBuffer.clear(); + whitespaceBuffer.clear(); + return fieldNames; + } + + /** + * Generates the next record batch + * @return number of records in the batch + */ + + @Override + public boolean next() { + reader.resetForNextBatch(); + + try { + boolean more = false; + while (! writer.isFull()) { + more = reader.parseNext(); + if (! more) { + break; + } + } + reader.finishBatch(); + + // Return false on the batch that hits EOF. The scan operator + // knows to process any rows in this final batch. + + return more && writer.rowCount() > 0; + } catch (IOException | TextParsingException e) { + if (e.getCause() != null && e.getCause() instanceof UserException) { + throw (UserException) e.getCause(); + } + throw UserException.dataReadError(e) + .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.", + split.getPath(), reader.getPos()) + .build(logger); + } + } + + /** + * Cleanup state once we are finished processing all the records. + * This would internally close the input stream we are reading from. + */ + @Override + public void close() { + + // Release the buffers allocated above. Double-check to handle + // unexpected multiple calls to close(). + + if (readBuffer != null) { + readBuffer.release(); + readBuffer = null; + } + if (whitespaceBuffer != null) { + whitespaceBuffer.release(); + whitespaceBuffer = null; + } + try { + if (reader != null) { + reader.close(); + reader = null; + } + } catch (final IOException e) { + logger.warn("Exception while closing stream.", e); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java new file mode 100644 index 000000000..df48a5548 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a set of varchar vectors. A varchar vector contains all the field + * values for a given column. Each record is a single value within each vector of the set. + */ +class FieldVarCharOutput extends BaseFieldOutput { + + /** + * We initialize and add the varchar vector for each incoming field in this + * constructor. + * @param outputMutator Used to create/modify schema + * @param fieldNames Incoming field names + * @param columns List of columns selected in the query + * @param isStarQuery boolean to indicate if all fields are selected or not + */ + public FieldVarCharOutput(RowSetLoader writer) { + super(writer, + TextReader.MAXIMUM_NUMBER_COLUMNS, + makeMask(writer)); + } + + private static boolean[] makeMask(RowSetLoader writer) { + final TupleMetadata schema = writer.tupleSchema(); + final boolean projectionMask[] = new boolean[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + projectionMask[i] = schema.metadata(i).isProjected(); + } + return projectionMask; + } + + @Override + public boolean endField() { + if (fieldProjected) { + writer.scalar(currentFieldIndex) + .setBytes(fieldBytes, currentDataPointer); + } + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java new file mode 100644 index 000000000..62eafc898 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.fs.Path; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +/** + * Text output that implements a header reader/parser. + * The caller parses out the characters of each header; + * this class assembles UTF-8 bytes into Unicode characters, + * fixes invalid characters (those not legal for SQL symbols), + * and maps duplicate names to unique names. + * <p> + * That is, this class is as permissive as possible with file + * headers to avoid spurious query failures for trivial reasons. + */ + +// Note: this class uses Java heap strings and the usual Java +// convenience classes. Since we do heavy Unicode string operations, +// and read a single row, there is no good reason to try to use +// value vectors and direct memory for this task. + +public class HeaderBuilder extends TextOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class); + + /** + * Maximum Drill symbol length, as enforced for headers. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + */ + // TODO: Replace with the proper constant, if available + public static final int MAX_HEADER_LEN = 1024; + + /** + * Prefix used to replace non-alphabetic characters at the start of + * a column name. For example, $foo becomes col_foo. Used + * because SQL does not allow _foo. + */ + + public static final String COLUMN_PREFIX = "col_"; + + /** + * Prefix used to create numbered columns for missing + * headers. Typical names: column_1, column_2, ... + */ + + public static final String ANONYMOUS_COLUMN_PREFIX = "column_"; + + public final Path filePath; + public final List<String> headers = new ArrayList<>(); + public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); + + public HeaderBuilder(Path filePath) { + this.filePath = filePath; + } + + @Override + public void startField(int index) { + currentField.clear(); + } + + @Override + public boolean endField() { + String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8); + header = validateSymbol(header); + headers.add(header); + return true; + } + + @Override + public boolean endEmptyField() { + + // Empty header will be rewritten to "column_<n>". + + return endField(); + } + + /** + * Validate the header name according to the SQL lexical rules. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + * @param header the header name to validate + */ + + // TODO: Replace with existing code, if any. + private String validateSymbol(String header) { + header = header.trim(); + + // To avoid unnecessary query failures, just make up a column name + // if the name is missing or all blanks. + + if (header.isEmpty()) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } + if (! Character.isAlphabetic(header.charAt(0))) { + return rewriteHeader(header); + } + for (int i = 1; i < header.length(); i++) { + char ch = header.charAt(i); + if (! Character.isAlphabetic(ch) && + ! Character.isDigit(ch) && ch != '_') { + return rewriteHeader(header); + } + } + return header; + } + + /** + * Given an invalid header, rewrite it to replace illegal characters + * with valid ones. The header won't be what the user specified, + * but it will be a valid SQL identifier. This solution avoids failing + * queries due to corrupted or invalid header data. + * <p> + * Names with invalid first characters are mapped to "col_". Example: + * $foo maps to col_foo. If the only character is non-alphabetic, treat + * the column as anonymous and create a generic name: column_4, etc. + * <p> + * This mapping could create a column that exceeds the maximum length + * of 1024. Since that is not really a hard limit, we just live with the + * extra few characters. + * + * @param header the original header + * @return the rewritten header, valid for SQL + */ + + private String rewriteHeader(String header) { + final StringBuilder buf = new StringBuilder(); + + // If starts with non-alphabetic, can't map the character to + // underscore, so just tack on a prefix. + + char ch = header.charAt(0); + if (Character.isAlphabetic(ch)) { + buf.append(ch); + } else if (Character.isDigit(ch)) { + buf.append(COLUMN_PREFIX); + buf.append(ch); + + // For the strange case of only one character, format + // the same as an empty header. + + } else if (header.length() == 1) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } else { + buf.append(COLUMN_PREFIX); + } + + // Convert all remaining invalid characters to underscores + + for (int i = 1; i < header.length(); i++) { + ch = header.charAt(i); + if (Character.isAlphabetic(ch) || + Character.isDigit(ch) || ch == '_') { + buf.append(ch); + } else { + buf.append("_"); + } + } + return buf.toString(); + } + + @Override + public void append(byte data) { + + // Ensure the data fits. Note that, if the name is Unicode, the actual + // number of characters might be less than the limit even though the + // byte count exceeds the limit. Fixing this, in general, would require + // a buffer four times larger, so we leave that as a later improvement + // if ever needed. + + try { + currentField.put(data); + } catch (BufferOverflowException e) { + throw UserException.dataReadError() + .message("Column exceeds maximum length of %d", MAX_HEADER_LEN) + .addContext("File Path", filePath.toString()) + .build(logger); + } + } + + @Override + public void finishRecord() { + if (headers.isEmpty()) { + throw UserException.dataReadError() + .message("The file must define at least one header.") + .addContext("File Path", filePath.toString()) + .build(logger); + } + + // Force headers to be unique. + + final Set<String> idents = new HashSet<String>(); + for (int i = 0; i < headers.size(); i++) { + String header = headers.get(i); + String key = header.toLowerCase(); + + // Is the header a duplicate? + + if (idents.contains(key)) { + + // Make header unique by appending a suffix. + // This loop must end because we have a finite + // number of headers. + // The original column is assumed to be "1", so + // the first duplicate is "2", and so on. + // Note that this will map columns of the form: + // "col,col,col_2,col_2_2" to + // "col", "col_2", "col_2_2", "col_2_2_2". + // No mapping scheme is perfect... + + for (int l = 2; ; l++) { + final String rewritten = header + "_" + l; + key = rewritten.toLowerCase(); + if (! idents.contains(key)) { + headers.set(i, rewritten); + break; + } + } + } + idents.add(key); + } + } + + @Override + public void startRecord() { } + + public String[] getHeaders() { + + // Just return the headers: any needed checks were done in + // finishRecord() + + final String array[] = new String[headers.size()]; + return headers.toArray(array); + } + + // Not used. + @Override + public long getRecordCount() { return 0; } + + @Override + public boolean isFull() { return false; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java new file mode 100644 index 000000000..13b44509e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a single vector of type repeated varchar vector. Each record is a single + * value within the vector containing all the fields in the record as individual array elements. + */ +public class RepeatedVarCharOutput extends BaseFieldOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + + private final ScalarWriter columnWriter; + private final ArrayWriter arrayWriter; + + /** + * Provide the row set loader (which must have just one repeated Varchar + * column) and an optional array projection mask. + * @param projectionMask + * @param tupleLoader + */ + + public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) { + super(loader, + maxField(loader, projectionMask), + projectionMask); + arrayWriter = writer.array(0); + columnWriter = arrayWriter.scalar(); + } + + private static int maxField(RowSetLoader loader, boolean[] projectionMask) { + + // If the one and only field (`columns`) is not selected, then this + // is a COUNT(*) or similar query. Select nothing. + + if (! loader.tupleSchema().metadata(0).isProjected()) { + return -1; + } + + // If this is SELECT * or SELECT `columns` query, project all + // possible fields. + + if (projectionMask == null) { + return TextReader.MAXIMUM_NUMBER_COLUMNS; + } + + // Else, this is a SELECT columns[x], columns[y], ... query. + // Project only the requested element members (fields). + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + return end; + } + + /** + * Write the value into an array position. Rules: + * <ul> + * <li>If there is no projection mask, collect all columns.</li> + * <li>If a selection mask is present, we previously found the index + * of the last projection column (<tt>maxField</tt>). If the current + * column is beyond that number, ignore the data and stop accepting + * columns.</li> + * <li>If the column is projected, add the data to the array.</li> + * <li>If the column is not projected, add a blank value to the + * array.</li> + * </ul> + * The above ensures that we leave no holes in the portion of the + * array that is projected (by adding blank columns where needed), + * and we just ignore columns past the end of the projected part + * of the array. (No need to fill holes at the end.) + */ + + @Override + public boolean endField() { + + // Skip the field if past the set of projected fields. + + if (currentFieldIndex > maxField) { + return false; + } + + // If the field is projected, save it. + + if (fieldProjected) { + + // Repeated var char will create as many entries as there are columns. + // If this would exceed the maximum, issue an error. Note that we do + // this only if all fields are selected; the same query will succeed if + // the user does a COUNT(*) or SELECT columns[x], columns[y], ... + + if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { + throw UserException + .unsupportedError() + .message("Text file contains too many fields") + .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS) + .build(logger); + } + + // Save the field. + + columnWriter.setBytes(fieldBytes, currentDataPointer); + } else { + + // The field is not projected. + // Must write a value into this array position, but + // the value should be empty. + + columnWriter.setBytes(fieldBytes, 0); + } + + // Return whether the rest of the fields should be read. + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java new file mode 100644 index 000000000..70c43b7ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +class StreamFinishedPseudoException extends RuntimeException { + + public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException(); + + private StreamFinishedPseudoException() { + super("", null, false, true); + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java new file mode 100644 index 000000000..26fade6d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; +import io.netty.util.internal.PlatformDependent; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + +/** + * Class that fronts an InputStream to provide a byte consumption interface. + * Also manages only reading lines to and from each split. + */ +final class TextInput { + + private final byte[] lineSeparator; + private final byte normalizedLineSeparator; + private final TextParsingSettingsV3 settings; + + private long lineCount; + private long charCount; + + /** + * The starting position in the file. + */ + private final long startPos; + private final long endPos; + + private long streamPos; + + private final Seekable seekable; + private final FSDataInputStream inputFS; + private final InputStream input; + + private final DrillBuf buffer; + private final ByteBuffer underlyingBuffer; + private final long bStart; + private final long bStartMinus1; + + private final boolean bufferReadable; + + /** + * Whether there was a possible partial line separator on the previous + * read so we dropped it and it should be appended to next read. + */ + private int remByte = -1; + + /** + * The current position in the buffer. + */ + public int bufferPtr; + + /** + * The quantity of valid data in the buffer. + */ + public int length = -1; + + private boolean endFound = false; + + /** + * Creates a new instance with the mandatory characters for handling newlines + * transparently. lineSeparator the sequence of characters that represent a + * newline, as defined in {@link Format#getLineSeparator()} + * normalizedLineSeparator the normalized newline character (as defined in + * {@link Format#getNormalizedNewline()}) that is used to replace any + * lineSeparator sequence found in the input. + */ + public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { + this.lineSeparator = settings.getNewLineDelimiter(); + byte normalizedLineSeparator = settings.getNormalizedNewLine(); + Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); + boolean isCompressed = input instanceof CompressionInputStream; + Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); + + // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely. + if (isCompressed && endPos > 0) { + endPos = Long.MAX_VALUE; + } + + this.input = input; + this.seekable = (Seekable) input; + this.settings = settings; + + if (input instanceof FSDataInputStream) { + this.inputFS = (FSDataInputStream) input; + this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable; + } else { + this.inputFS = null; + this.bufferReadable = false; + } + + this.startPos = startPos; + this.endPos = endPos; + + this.normalizedLineSeparator = normalizedLineSeparator; + + this.buffer = readBuffer; + this.bStart = buffer.memoryAddress(); + this.bStartMinus1 = bStart -1; + this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity()); + } + + /** + * Test the input to position for read start. If the input is a non-zero split or + * splitFirstLine is enabled, input will move to appropriate complete line. + * @throws IOException for input file read errors + */ + final void start() throws IOException { + lineCount = 0; + if(startPos > 0){ + seekable.seek(startPos); + } + + updateBuffer(); + if (length > 0) { + if (startPos > 0 || settings.isSkipFirstLine()) { + + // move to next full record. + skipLines(1); + } + } + } + + + /** + * Helper method to get the most recent characters consumed since the last record started. + * May get an incomplete string since we don't support stream rewind. Returns empty string for now. + * + * @return String of last few bytes. + * @throws IOException for input file read errors + */ + public String getStringSinceMarkForError() throws IOException { + return " "; + } + + long getPos() { + return streamPos + bufferPtr; + } + + public void mark() { } + + /** + * Read some more bytes from the stream. Uses the zero copy interface if available. + * Otherwise, does byte copy. + * + * @throws IOException for input file read errors + */ + private void read() throws IOException { + if (bufferReadable) { + + if (remByte != -1) { + for (int i = 0; i <= remByte; i++) { + underlyingBuffer.put(lineSeparator[i]); + } + remByte = -1; + } + length = inputFS.read(underlyingBuffer); + + } else { + byte[] b = new byte[underlyingBuffer.capacity()]; + if (remByte != -1){ + int remBytesNum = remByte + 1; + System.arraycopy(lineSeparator, 0, b, 0, remBytesNum); + length = input.read(b, remBytesNum, b.length - remBytesNum); + remByte = -1; + } else { + length = input.read(b); + } + underlyingBuffer.put(b); + } + } + + + /** + * Read more data into the buffer. Will also manage split end conditions. + * + * @throws IOException for input file read errors + */ + private void updateBuffer() throws IOException { + streamPos = seekable.getPos(); + underlyingBuffer.clear(); + + if (endFound) { + length = -1; + return; + } + + read(); + + // check our data read allowance. + if (streamPos + length >= this.endPos) { + updateLengthBasedOnConstraint(); + } + + charCount += bufferPtr; + bufferPtr = 1; + + buffer.writerIndex(underlyingBuffer.limit()); + buffer.readerIndex(underlyingBuffer.position()); + } + + /** + * Checks to see if we can go over the end of our bytes constraint on the data. If so, + * adjusts so that we can only read to the last character of the first line that crosses + * the split boundary. + */ + private void updateLengthBasedOnConstraint() { + final long max = bStart + length; + for (long m = bStart + (endPos - streamPos); m < max; m++) { + for (int i = 0; i < lineSeparator.length; i++) { + long mPlus = m + i; + if (mPlus < max) { + // we found a line separator and don't need to consult the next byte. + if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) { + length = (int) (mPlus - bStart) + 1; + endFound = true; + return; + } + } else { + // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read. + remByte = i; + length = length - i; + return; + } + } + } + } + + /** + * Get next byte from stream. Also maintains the current line count. Will throw a + * {@link StreamFinishedPseudoException} when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextChar() throws IOException { + byte byteChar = nextCharNoNewLineCheck(); + int bufferPtrTemp = bufferPtr - 1; + if (byteChar == lineSeparator[0]) { + for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) { + if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) { + return byteChar; + } + } + + lineCount++; + byteChar = normalizedLineSeparator; + + // we don't need to update buffer position if line separator is one byte long + if (lineSeparator.length > 1) { + bufferPtr += (lineSeparator.length - 1); + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + } + } + + return byteChar; + } + + /** + * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException + * when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextCharNoNewLineCheck() throws IOException { + + if (length == -1) { + throw StreamFinishedPseudoException.INSTANCE; + } + + rangeCheck(buffer, bufferPtr - 1, bufferPtr); + + byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr); + + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + bufferPtr--; + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + + bufferPtr++; + return byteChar; + } + + /** + * Number of lines read since the start of this split. + * @return current line count + */ + public final long lineCount() { + return lineCount; + } + + /** + * Skip forward the number of line delimiters. If you are in the middle of a line, + * a value of 1 will skip to the start of the next record. + * + * @param lines Number of lines to skip. + * @throws IOException for input file read errors + * @throws IllegalArgumentException if unable to skip the requested number + * of lines + */ + public final void skipLines(int lines) throws IOException { + if (lines < 1) { + return; + } + long expectedLineCount = this.lineCount + lines; + + try { + do { + nextChar(); + } while (lineCount < expectedLineCount); + if (lineCount < lines) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } catch (EOFException ex) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } + + public final long charCount() { + return charCount + bufferPtr; + } + + public long getLineCount() { + return lineCount; + } + + public void close() throws IOException{ + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java new file mode 100644 index 000000000..48c184991 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + + +/** + * Base class for producing output record batches while dealing with + * text files. Defines the interface called from text parsers to create + * the corresponding value vectors (record batch). + */ + +abstract class TextOutput { + + public abstract void startRecord(); + + /** + * Start processing a new field within a record. + * @param index index within the record + */ + public abstract void startField(int index); + + /** + * End processing a field within a record. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endField(); + + /** + * Shortcut that lets the output know that we are closing ending a field with no data. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endEmptyField(); + + /** + * Add the provided data but drop any whitespace. + * @param data character to append + */ + public void appendIgnoringWhitespace(byte data) { + if (TextReader.isWhite(data)) { + // noop + } else { + append(data); + } + } + + /** + * Appends a byte to the output character data buffer + * @param data current byte read + */ + public abstract void append(byte data); + + /** + * Completes the processing of a given record. Also completes the processing of the + * last field being read. + */ + public abstract void finishRecord(); + + /** + * Return the total number of records (across batches) processed + */ + public abstract long getRecordCount(); + + /** + * Indicates if the current batch is full and reading for this batch + * should stop. + * + * @return true if the batch is full and the reader must exit to allow + * the batch to be sent downstream, false if the reader may continue to + * add rows to the current batch + */ + + public abstract boolean isFull(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java new file mode 100644 index 000000000..86cad4c88 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; + +import com.univocity.parsers.common.ParsingContext; + +class TextParsingContext implements ParsingContext { + + private final TextInput input; + private final TextOutput output; + protected boolean stopped; + + private int[] extractedIndexes; + + public TextParsingContext(TextInput input, TextOutput output) { + this.input = input; + this.output = output; + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() { + stopped = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isStopped() { + return stopped; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentLine() { + return input.lineCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public long currentChar() { + return input.charCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public int currentColumn() { + return -1; + } + + /** + * {@inheritDoc} + */ + @Override + public String[] headers() { + return new String[]{}; + } + + /** + * {@inheritDoc} + */ + @Override + public int[] extractedFieldIndexes() { + return extractedIndexes; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentRecord() { + return output.getRecordCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public String currentParsedContent() { + try { + return input.getStringSinceMarkForError(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void skipLines(int lines) { + } + + @Override + public boolean columnsReordered() { + return false; + } + + public boolean isFull() { + return output.isFull(); + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java new file mode 100644 index 000000000..0341b4554 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +// TODO: Remove the "V3" suffix once the V2 version is retired. +public class TextParsingSettingsV3 { + + public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3(); + + private String emptyValue = null; + private boolean parseUnescapedQuotes = true; + private byte quote = b('"'); + private byte quoteEscape = b('"'); + private byte delimiter = b(','); + private byte comment = b('#'); + + private long maxCharsPerColumn = Character.MAX_VALUE; + private byte normalizedNewLine = b('\n'); + private byte[] newLineDelimiter = {normalizedNewLine}; + private boolean ignoreLeadingWhitespaces; + private boolean ignoreTrailingWhitespaces; + private String lineSeparatorString = "\n"; + private boolean skipFirstLine; + + private boolean headerExtractionEnabled; + private boolean useRepeatedVarChar = true; + + public void set(TextFormatConfig config){ + this.quote = bSafe(config.getQuote(), "quote"); + this.quoteEscape = bSafe(config.getEscape(), "escape"); + this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8); + this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter"); + this.comment = bSafe(config.getComment(), "comment"); + this.skipFirstLine = config.isSkipFirstLine(); + this.headerExtractionEnabled = config.isHeaderExtractionEnabled(); + if (this.headerExtractionEnabled) { + // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar + this.useRepeatedVarChar = false; + } + } + + public byte getComment() { + return comment; + } + + public boolean isSkipFirstLine() { + return skipFirstLine; + } + + public void setSkipFirstLine(boolean skipFirstLine) { + this.skipFirstLine = skipFirstLine; + } + + public boolean isUseRepeatedVarChar() { + return useRepeatedVarChar; + } + + public void setUseRepeatedVarChar(boolean useRepeatedVarChar) { + this.useRepeatedVarChar = useRepeatedVarChar; + } + + private static byte bSafe(char c, String name) { + if (c > Byte.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a " + + "character between 0 and 127 but value was actually %d.", name, (int) c)); + } + return (byte) c; + } + + private static byte b(char c) { + return (byte) c; + } + + public byte[] getNewLineDelimiter() { + return newLineDelimiter; + } + + /** + * Returns the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @return the quote character + */ + public byte getQuote() { + return quote; + } + + /** + * Defines the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @param quote + * the quote character + */ + public void setQuote(byte quote) { + this.quote = quote; + } + + public String getLineSeparatorString() { + return lineSeparatorString; + } + + /** + * Identifies whether or not a given character is used for escaping values + * where the field delimiter is part of the value + * + * @param ch + * the character to be verified + * @return true if the given character is the character used for escaping + * values, false otherwise + */ + public boolean isQuote(byte ch) { + return this.quote == ch; + } + + /** + * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"' + * @return the quote escape character + */ + public byte getQuoteEscape() { + return quoteEscape; + } + + /** + * Defines the character used for escaping quotes inside an already quoted + * value. Defaults to '"' + * + * @param quoteEscape + * the quote escape character + */ + public void setQuoteEscape(byte quoteEscape) { + this.quoteEscape = quoteEscape; + } + + /** + * Identifies whether or not a given character is used for escaping quotes + * inside an already quoted value. + * + * @param ch + * the character to be verified + * @return true if the given character is the quote escape character, false + * otherwise + */ + public boolean isQuoteEscape(byte ch) { + return this.quoteEscape == ch; + } + + /** + * Returns the field delimiter character. Defaults to ',' + * @return the field delimiter character + */ + public byte getDelimiter() { + return delimiter; + } + + /** + * Defines the field delimiter character. Defaults to ',' + * @param delimiter the field delimiter character + */ + public void setDelimiter(byte delimiter) { + this.delimiter = delimiter; + } + + /** + * Identifies whether or not a given character represents a field delimiter + * @param ch the character to be verified + * @return true if the given character is the field delimiter character, false otherwise + */ + public boolean isDelimiter(byte ch) { + return this.delimiter == ch; + } + + /** + * Returns the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @return the String representation of an empty value + */ + public String getEmptyValue() { + return emptyValue; + } + + /** + * Sets the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @param emptyValue + * the String representation of an empty value + */ + public void setEmptyValue(String emptyValue) { + this.emptyValue = emptyValue; + } + + /** + * Indicates whether the CSV parser should accept unescaped quotes inside + * quoted values and parse them normally. Defaults to {@code true}. + * + * @return a flag indicating whether or not the CSV parser should accept + * unescaped quotes inside quoted values. + */ + public boolean isParseUnescapedQuotes() { + return parseUnescapedQuotes; + } + + /** + * Configures how to handle unescaped quotes inside quoted values. If set to + * {@code true}, the parser will parse the quote normally as part of the + * value. If set the {@code false}, a + * {@link com.univocity.parsers.common.TextParsingException} will be thrown. + * Defaults to {@code true}. + * + * @param parseUnescapedQuotes + * indicates whether or not the CSV parser should accept unescaped + * quotes inside quoted values. + */ + public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) { + this.parseUnescapedQuotes = parseUnescapedQuotes; + } + + /** + * Indicates whether or not the first valid record parsed from the input + * should be considered as the row containing the names of each column + * + * @return true if the first valid record parsed from the input should be + * considered as the row containing the names of each column, false + * otherwise + */ + public boolean isHeaderExtractionEnabled() { + return headerExtractionEnabled; + } + + /** + * Defines whether or not the first valid record parsed from the input should + * be considered as the row containing the names of each column + * + * @param headerExtractionEnabled + * a flag indicating whether the first valid record parsed from the + * input should be considered as the row containing the names of each + * column + */ + public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) { + this.headerExtractionEnabled = headerExtractionEnabled; + } + + public long getMaxCharsPerColumn() { + return maxCharsPerColumn; + } + + public void setMaxCharsPerColumn(long maxCharsPerColumn) { + this.maxCharsPerColumn = maxCharsPerColumn; + } + + public void setComment(byte comment) { + this.comment = comment; + } + + public byte getNormalizedNewLine() { + return normalizedNewLine; + } + + public void setNormalizedNewLine(byte normalizedNewLine) { + this.normalizedNewLine = normalizedNewLine; + } + + public boolean isIgnoreLeadingWhitespaces() { + return ignoreLeadingWhitespaces; + } + + public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) { + this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces; + } + + public boolean isIgnoreTrailingWhitespaces() { + return ignoreTrailingWhitespaces; + } + + public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) { + this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java new file mode 100644 index 000000000..17a076c0c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.UserException; + +import com.univocity.parsers.common.TextParsingException; + +/******************************************************************************* + * Portions Copyright 2014 uniVocity Software Pty Ltd + ******************************************************************************/ + +/** + * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and + * DrillBuf support. + */ +public final class TextReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class); + + private static final byte NULL_BYTE = (byte) '\0'; + + public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + + private final TextParsingContext context; + + private final TextParsingSettingsV3 settings; + + private final TextInput input; + private final TextOutput output; + private final DrillBuf workBuf; + + private byte ch; + + // index of the field within this record + private int fieldIndex; + + /** Behavior settings **/ + private final boolean ignoreTrailingWhitespace; + private final boolean ignoreLeadingWhitespace; + private final boolean parseUnescapedQuotes; + + /** Key Characters **/ + private final byte comment; + private final byte delimiter; + private final byte quote; + private final byte quoteEscape; + private final byte newLine; + + /** + * The CsvParser supports all settings provided by {@link TextParsingSettingsV3}, + * and requires this configuration to be properly initialized. + * @param settings the parser configuration + * @param input input stream + * @param output interface to produce output record batch + * @param workBuf working buffer to handle whitespace + */ + public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) { + this.context = new TextParsingContext(input, output); + this.workBuf = workBuf; + this.settings = settings; + + this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces(); + this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces(); + this.parseUnescapedQuotes = settings.isParseUnescapedQuotes(); + this.delimiter = settings.getDelimiter(); + this.quote = settings.getQuote(); + this.quoteEscape = settings.getQuoteEscape(); + this.newLine = settings.getNormalizedNewLine(); + this.comment = settings.getComment(); + + this.input = input; + this.output = output; + } + + public TextOutput getOutput() { return output; } + + /** + * Check if the given byte is a white space. As per the univocity text reader + * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed + * we have an additional check to make sure its not negative + */ + static final boolean isWhite(byte b){ + return b <= ' ' && b > -1; + } + + /** + * Inform the output interface to indicate we are starting a new record batch + */ + public void resetForNextBatch() { } + + public long getPos() { return input.getPos(); } + + /** + * Function encapsulates parsing an entire record, delegates parsing of the + * fields to parseField() function. + * We mark the start of the record and if there are any failures encountered (OOM for eg) + * then we reset the input stream to the marked position + * @return true if parsing this record was successful; false otherwise + * @throws IOException for input file read errors + */ + private boolean parseRecord() throws IOException { + final byte newLine = this.newLine; + final TextInput input = this.input; + + input.mark(); + + fieldIndex = 0; + if (ignoreLeadingWhitespace && isWhite(ch)) { + skipWhitespace(); + } + + output.startRecord(); + int fieldsWritten = 0; + try { + @SuppressWarnings("unused") + boolean earlyTerm = false; + while (ch != newLine) { + earlyTerm = ! parseField(); + fieldsWritten++; + if (ch != newLine) { + ch = input.nextChar(); + if (ch == newLine) { + output.startField(fieldsWritten++); + output.endEmptyField(); + break; + } + } + + // Disabling early termination. See DRILL-5914 + +// if (earlyTerm) { +// if (ch != newLine) { +// input.skipLines(1); +// } +// break; +// } + } + output.finishRecord(); + } catch (StreamFinishedPseudoException e) { + + // if we've written part of a field or all of a field, we should send this row. + + if (fieldsWritten == 0) { + throw e; + } else { + output.finishRecord(); + } + } + return true; + } + + /** + * Function parses an individual field and ignores any white spaces encountered + * by not appending it to the output vector + * @throws IOException for input file read errors + */ + private void parseValueIgnore() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + appendIgnoringWhitespace(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + public void appendIgnoringWhitespace(byte data) { + if (! isWhite(data)) { + output.append(data); + } + } + + /** + * Function parses an individual field and appends all characters till the delimeter (or newline) + * to the output, including white spaces + * @throws IOException for input file read errors + */ + private void parseValueAll() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + output.append(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + /** + * Function simply delegates the parsing of a single field to the actual + * implementation based on parsing config + * + * @throws IOException + * for input file read errors + */ + private void parseValue() throws IOException { + if (ignoreTrailingWhitespace) { + parseValueIgnore(); + } else { + parseValueAll(); + } + } + + /** + * Recursive function invoked when a quote is encountered. Function also + * handles the case when there are non-white space characters in the field + * after the quoted value. + * @param prev previous byte read + * @throws IOException for input file read errors + */ + private void parseQuotedValue(byte prev) throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + final byte quote = this.quote; + + ch = input.nextCharNoNewLineCheck(); + + while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) { + if (ch != quote) { + if (prev == quote) { // unescaped quote detected + if (parseUnescapedQuotes) { + output.append(quote); + output.append(ch); + parseQuotedValue(ch); + break; + } else { + throw new TextParsingException( + context, + "Unescaped quote character '" + + quote + + "' inside quoted value of CSV field. To allow unescaped quotes, " + + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. " + + "Cannot parse CSV input."); + } + } + output.append(ch); + prev = ch; + } else if (prev == quoteEscape) { + output.append(quote); + prev = NULL_BYTE; + } else { + prev = ch; + } + ch = input.nextCharNoNewLineCheck(); + } + + // Handles whitespace after quoted value: + // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ') + // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored + // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled. + if (ch != newLine && ch <= ' ' && ch != delimiter) { + final DrillBuf workBuf = this.workBuf; + workBuf.resetWriterIndex(); + do { + // saves whitespace after value + workBuf.writeByte(ch); + ch = input.nextChar(); + // found a new line, go to next record. + if (ch == newLine) { + return; + } + } while (ch <= ' ' && ch != delimiter); + + // there's more stuff after the quoted value, not only empty spaces. + if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) { + + output.append(quote); + for(int i =0; i < workBuf.writerIndex(); i++){ + output.append(workBuf.getByte(i)); + } + // the next character is not the escape character, put it there + if (ch != quoteEscape) { + output.append(ch); + } + // sets this character as the previous character (may be escaping) + // calls recursively to keep parsing potentially quoted content + parseQuotedValue(ch); + } + } + + if (!(ch == delimiter || ch == newLine)) { + throw new TextParsingException(context, "Unexpected character '" + ch + + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input."); + } + } + + /** + * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function + * @return true if more rows can be read, false if not + * @throws IOException for input file read errors + */ + private final boolean parseField() throws IOException { + + output.startField(fieldIndex++); + + if (isWhite(ch) && ignoreLeadingWhitespace) { + skipWhitespace(); + } + + // Delimiter? Then this is an empty field. + + if (ch == delimiter) { + return output.endEmptyField(); + } + + // Have the first character of the field. Parse and save the + // field, even if we hit EOF. (An EOF identifies a last line + // that contains data, but is not terminated with a newline.) + + try { + if (ch == quote) { + parseQuotedValue(NULL_BYTE); + } else { + parseValue(); + } + return output.endField(); + } catch (StreamFinishedPseudoException e) { + return output.endField(); + } + } + + /** + * Helper function to skip white spaces occurring at the current input stream. + * @throws IOException for input file read errors + */ + private void skipWhitespace() throws IOException { + final byte delimiter = this.delimiter; + final byte newLine = this.newLine; + final TextInput input = this.input; + + while (isWhite(ch) && ch != delimiter && ch != newLine) { + ch = input.nextChar(); + } + } + + /** + * Starting point for the reader. Sets up the input interface. + * @throws IOException for input file read errors + */ + public final void start() throws IOException { + context.stopped = false; + input.start(); + } + + /** + * Parses the next record from the input. Will skip the line if its a comment, + * this is required when the file contains headers + * @throws IOException for input file read errors + */ + public final boolean parseNext() throws IOException { + try { + while (! context.stopped) { + ch = input.nextChar(); + if (ch == comment) { + input.skipLines(1); + continue; + } + break; + } + final long initialLineNumber = input.lineCount(); + boolean success = parseRecord(); + if (initialLineNumber + 1 < input.lineCount()) { + throw new TextParsingException(context, "Cannot use newline character within quoted string"); + } + + return success; + } catch (UserException ex) { + stopParsing(); + throw ex; + } catch (StreamFinishedPseudoException ex) { + stopParsing(); + return false; + } catch (Exception ex) { + try { + throw handleException(ex); + } finally { + stopParsing(); + } + } + } + + private void stopParsing() { } + + private String displayLineSeparators(String str, boolean addNewLine) { + if (addNewLine) { + if (str.contains("\r\n")) { + str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t"); + } else if (str.contains("\n")) { + str = str.replaceAll("\\n", "[\\\\n]\n\t"); + } else { + str = str.replaceAll("\\r", "[\\\\r]\r\t"); + } + } else { + str = str.replaceAll("\\n", "\\\\n"); + str = str.replaceAll("\\r", "\\\\r"); + } + return str; + } + + /** + * Helper method to handle exceptions caught while processing text files and generate better error messages associated with + * the exception. + * @param ex Exception raised + * @throws IOException for input file read errors + */ + private TextParsingException handleException(Exception ex) throws IOException { + + if (ex instanceof TextParsingException) { + throw (TextParsingException) ex; + } + + String message = null; + String tmp = input.getStringSinceMarkForError(); + char[] chars = tmp.toCharArray(); + if (chars != null) { + int length = chars.length; + if (length > settings.getMaxCharsPerColumn()) { + message = "Length of parsed input (" + length + + ") exceeds the maximum number of characters defined in your parser settings (" + + settings.getMaxCharsPerColumn() + "). "; + } + + if (tmp.contains("\n") || tmp.contains("\r")) { + tmp = displayLineSeparators(tmp, true); + String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false); + message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '" + + lineSeparator + "'. Parsed content:\n\t" + tmp; + } + + int nullCharacterCount = 0; + // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError + int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length; + StringBuilder s = new StringBuilder(maxLength); + for (int i = 0; i < maxLength; i++) { + if (chars[i] == '\0') { + s.append('\\'); + s.append('0'); + nullCharacterCount++; + } else { + s.append(chars[i]); + } + } + tmp = s.toString(); + + if (nullCharacterCount > 0) { + message += "\nIdentified " + + nullCharacterCount + + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t" + + tmp; + } + } + + UserException.Builder builder; + if (ex instanceof UserException) { + builder = ((UserException) ex).rebuild(); + } else { + builder = UserException + .dataReadError(ex) + .message(message); + } + throw builder + .addContext("Line", context.currentLine()) + .addContext("Record", context.currentRecord()) + .build(logger); + } + + /** + * Finish the processing of a batch, indicates to the output + * interface to wrap up the batch + */ + public void finishBatch() { } + + /** + * Invoked once there are no more records and we are done with the + * current record reader to clean up state. + * @throws IOException for input file read errors + */ + public void close() throws IOException { + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java new file mode 100644 index 000000000..aced5adfd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Version 3 of the text reader. Hosts the "compliant" text reader on + * the row set framework. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index a379db175..37b8a4073 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -593,7 +593,6 @@ public class Foreman implements Runnable { new BasicOptimizer.BasicOptimizationContext(queryContext), plan); } - /** * Manages the end-state processing for Foreman. * @@ -726,7 +725,6 @@ public class Foreman implements Runnable { } } - @SuppressWarnings("resource") @Override public void close() { Preconditions.checkState(!isClosed); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 7e9415530..5443eea5f 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -522,6 +522,8 @@ drill.exec.options: { exec.queue.memory_reserve_ratio: 0.2, exec.sort.disable_managed : false, exec.storage.enable_new_text_reader: true, + exec.storage.enable_v3_text_reader: false, + exec.storage.min_width: 1, exec.udf.enable_dynamic_support: true, exec.udf.use_dynamic: true, drill.exec.stats.logging.batch_size: false, diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java index e00e5dc60..87757782b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java @@ -128,6 +128,7 @@ public class TestPartitionFilter extends PlanTestBase { String query = "select * from dfs.`multilevel/parquet` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)"; testIncludeFilter(query, 2, "Filter\\(", 8); } + @Test //Parquet: partition filters are ANDed and belong to a top-level OR public void testPartitionFilter3_Parquet_from_CTAS() throws Exception { String query = "select * from dfs.tmp.parquet where (yr=1994 and qrtr='Q1' and o_custkey < 500) or (yr=1995 and qrtr='Q2' and o_custkey > 500)"; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java index e5ac6d873..89df5986f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java @@ -27,6 +27,7 @@ import static org.junit.Assert.fail; import java.util.Iterator; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -50,12 +51,14 @@ import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Test the implementation of the Drill Volcano iterator protocol that * wraps the modular operator implementation. */ +@Category(RowSetTests.class) public class TestOperatorRecordBatch extends SubOperatorTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubOperatorTest.class); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java index 88ccd3c41..2d066dee4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java @@ -64,7 +64,10 @@ public class TestColumnsArray extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); // ...and the columns array manager diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java index d1e91a283..2a5b00e9f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java @@ -232,7 +232,10 @@ public class TestColumnsArrayParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java index a6de5e6f3..bbc5e19a3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java @@ -45,7 +45,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partiton columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); // Simulate SELECT a, b, c ... @@ -70,7 +73,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); // Simulate SELECT a, fqn, filEPath, filename, suffix ... @@ -114,7 +120,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); String dir0 = ScanTestUtils.partitionColName(0); @@ -146,7 +155,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -171,6 +183,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion true, // Put partitions at end new Path("hdfs:///w"), + 3, // Max partition depth is 3, though this "scan" sees only 2 Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -178,12 +191,14 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Lists.newArrayList(metadataManager.projectionParser())); List<ColumnProjection> cols = scanProj.columns(); - assertEquals(3, cols.size()); + assertEquals(4, cols.size()); assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); assertEquals(0, ((PartitionColumn) cols.get(1)).partition()); assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); assertEquals(1, ((PartitionColumn) cols.get(2)).partition()); + assertEquals(PartitionColumn.ID, cols.get(3).nodeType()); + assertEquals(2, ((PartitionColumn) cols.get(3)).partition()); } /** @@ -199,6 +214,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion false, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -230,6 +246,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion false, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -262,7 +279,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -284,6 +304,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion true, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -310,6 +331,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion false, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -343,6 +365,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion true, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -367,6 +390,7 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { true, // Use legacy wildcard expansion false, // Put partitions at end new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -394,7 +418,10 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java index 314bc2a13..dae89ab0a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java @@ -163,7 +163,10 @@ public class TestFileMetadataProjection extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partiton columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -249,7 +252,10 @@ public class TestFileMetadataProjection extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( @@ -300,7 +306,10 @@ public class TestFileMetadataProjection extends SubOperatorTest { Path filePath = new Path("hdfs:///x/0/1/2/3/4/5/6/7/8/9/10/d11/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///x"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanLevelProjection scanProj = new ScanLevelProjection( diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java index b05bb28f3..7b9fbfbe4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java @@ -103,6 +103,7 @@ public class TestFileScanFramework extends SubOperatorTest { public abstract static class BaseFileScanOpFixture extends AbstractScanOpFixture { protected Path selectionRoot = MOCK_ROOT_PATH; + protected int partitionDepth = 3; protected List<FileWork> files = new ArrayList<>(); protected Configuration fsConfig = new Configuration(); @@ -116,7 +117,7 @@ public class TestFileScanFramework extends SubOperatorTest { protected abstract BaseFileScanFramework<?> buildFramework(); private void configureFileScan(BaseFileScanFramework<?> framework) { - framework.setSelectionRoot(selectionRoot); + framework.setSelectionRoot(selectionRoot, partitionDepth); } } @@ -311,10 +312,11 @@ public class TestFileScanFramework extends SubOperatorTest { .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR) .addNullable(ScanTestUtils.partitionColName(0), MinorType.VARCHAR) .addNullable(ScanTestUtils.partitionColName(1), MinorType.VARCHAR) + .addNullable(ScanTestUtils.partitionColName(2), MinorType.VARCHAR) .buildSchema(); SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(30, "fred", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1) - .addRow(40, "wilma", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1) + .addRow(30, "fred", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null) + .addRow(40, "wilma", MOCK_FILE_FQN, MOCK_FILE_PATH, MOCK_FILE_NAME, MOCK_SUFFIX, MOCK_DIR0, MOCK_DIR1, null) .build(); RowSetComparison verifier = new RowSetComparison(expected); assertEquals(expected.batchSchema(), scan.batchAccessor().getSchema()); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java index c7b52e2da..7ee91a504 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java @@ -59,7 +59,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator()); @@ -124,7 +127,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); scanner.withMetadata(metadataManager); @@ -193,7 +199,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); scanner.withMetadata(metadataManager); @@ -269,7 +278,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePath)); scanner.withMetadata(metadataManager); @@ -334,7 +346,10 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { Path filePathB = new Path("hdfs:///w/x/b.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePathA, filePathB)); scanner.withMetadata(metadataManager); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java index 8adc0372a..cdfabc444 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java @@ -102,7 +102,10 @@ public class TestSchemaSmoothing extends SubOperatorTest { Path filePathB = new Path("hdfs:///w/x/y/b.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePathA, filePathB)); // Set up the scan level projection @@ -580,7 +583,10 @@ public class TestSchemaSmoothing extends SubOperatorTest { Path filePathB = new Path("hdfs:///w/x/y/b.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePathA, filePathB)); // Set up the scan level projection @@ -628,7 +634,10 @@ public class TestSchemaSmoothing extends SubOperatorTest { Path filePathB = new Path("hdfs:///w/x/b.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePathA, filePathB)); // Set up the scan level projection @@ -676,7 +685,10 @@ public class TestSchemaSmoothing extends SubOperatorTest { Path filePathB = new Path("hdfs:///w/x/y/b.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + false, // Don't expand partition columns for wildcard + false, // N/A new Path("hdfs:///w"), + FileMetadataManager.AUTO_PARTITION_DEPTH, Lists.newArrayList(filePathA, filePathB)); // Set up the scan level projection diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java index 838c889f8..9c8bc9dd3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.validate; import static org.junit.Assert.assertFalse; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader; import org.apache.drill.test.BaseDirTestWatcher; import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; @@ -49,7 +48,6 @@ public class TestValidationOptions extends DrillTest { .toConsole() .logger(BatchValidator.class, Level.TRACE) .logger(IteratorValidatorCreator.class, Level.TRACE) - .logger(CompliantTextRecordReader.class, Level.TRACE) .build(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java index 2966bd561..5fb046e32 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestProjectedTuple.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTuple; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl; import org.apache.drill.exec.record.metadata.ProjectionType; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -353,9 +352,10 @@ public class TestProjectedTuple { assertEquals(ProjectionType.UNPROJECTED, projSet.projectionType("foo")); } - @Test - @Ignore("Drill syntax does not support map arrays") - public void testMapArray() { + //@Test + //@Ignore("Drill syntax does not support map arrays") + @SuppressWarnings("unused") + private void testMapArray() { RequestedTuple projSet = RequestedTupleImpl.parse( RowSetTestUtils.projectList("a[1].x")); List<RequestedColumn> cols = projSet.projections(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java new file mode 100644 index 000000000..056b8e45a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; + +public class BaseCsvTest extends ClusterTest { + + protected static final String PART_DIR = "root"; + protected static final String NESTED_DIR = "nested"; + protected static final String ROOT_FILE = "first.csv"; + protected static final String NESTED_FILE = "second.csv"; + + protected static String validHeaders[] = { + "a,b,c", + "10,foo,bar" + }; + + protected static String secondFile[] = { + "a,b,c", + "20,fred,wilma" + }; + + protected static File testDir; + + protected static void setup(boolean skipFirstLine, boolean extractHeader) throws Exception { + setup(skipFirstLine, extractHeader, 1); + } + + protected static void setup(boolean skipFirstLine, boolean extractHeader, + int maxParallelization) throws Exception { + startCluster( + ClusterFixture.builder(dirTestWatcher) + .maxParallelization(maxParallelization)); + + // Set up CSV storage plugin using headers. + + TextFormatConfig csvFormat = new TextFormatConfig(); + csvFormat.fieldDelimiter = ','; + csvFormat.skipFirstLine = skipFirstLine; + csvFormat.extractHeader = extractHeader; + + testDir = cluster.makeDataDir("data", "csv", csvFormat); + } + + protected static void buildNestedTable() throws IOException { + + // Two-level partitioned table + + File rootDir = new File(testDir, PART_DIR); + rootDir.mkdir(); + buildFile(new File(rootDir, ROOT_FILE), validHeaders); + File nestedDir = new File(rootDir, NESTED_DIR); + nestedDir.mkdir(); + buildFile(new File(nestedDir, NESTED_FILE), secondFile); + } + + protected void enableV3(boolean enable) { + client.alterSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY, enable); + } + + protected void resetV3() { + client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY); + } + + protected static void buildFile(String fileName, String[] data) throws IOException { + buildFile(new File(testDir, fileName), data); + } + + protected static void buildFile(File file, String[] data) throws IOException { + try(PrintWriter out = new PrintWriter(new FileWriter(file))) { + for (String line : data) { + out.println(line); + } + } + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java deleted file mode 100644 index 25aa738eb..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.easy.text.compliant; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; -import org.apache.drill.test.ClusterFixture; -import org.apache.drill.test.ClusterTest; -import org.apache.drill.test.rowSet.RowSet; -import org.apache.drill.test.rowSet.RowSetBuilder; -import org.apache.drill.test.rowSet.RowSetUtilities; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * SQL-level tests for CSV headers. See - * {@link TestHeaderBuilder} for detailed unit tests. - * This test does not attempt to duplicate all the cases - * from the unit tests; instead it just does a sanity check. - */ - -public class TestCsv extends ClusterTest { - - private static final String CASE2_FILE_NAME = "case2.csv"; - - private static File testDir; - - @BeforeClass - public static void setup() throws Exception { - startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1)); - - // Set up CSV storage plugin using headers. - - TextFormatConfig csvFormat = new TextFormatConfig(); - csvFormat.fieldDelimiter = ','; - csvFormat.skipFirstLine = false; - csvFormat.extractHeader = true; - - testDir = cluster.makeDataDir("data", "csv", csvFormat); - buildFile(CASE2_FILE_NAME, validHeaders); - } - - private static String emptyHeaders[] = { - "", - "10,foo,bar" - }; - - @Test - public void testEmptyCsvHeaders() throws IOException { - String fileName = "case1.csv"; - buildFile(fileName, emptyHeaders); - try { - client.queryBuilder().sql(makeStatement(fileName)).run(); - fail(); - } catch (Exception e) { - assertTrue(e.getMessage().contains("must define at least one header")); - } - } - - private static String validHeaders[] = { - "a,b,c", - "10,foo,bar" - }; - - @Test - public void testValidCsvHeaders() throws IOException { - RowSet actual = client.queryBuilder().sql(makeStatement(CASE2_FILE_NAME)).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .add("b", MinorType.VARCHAR) - .add("c", MinorType.VARCHAR) - .buildSchema(); - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar") - .build(); - RowSetUtilities.verify(expected, actual); - } - - private static String invalidHeaders[] = { - "$,,9b,c,c,c_2", - "10,foo,bar,fourth,fifth,sixth" - }; - - @Test - public void testInvalidCsvHeaders() throws IOException { - String fileName = "case3.csv"; - buildFile(fileName, invalidHeaders); - RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("column_1", MinorType.VARCHAR) - .add("column_2", MinorType.VARCHAR) - .add("col_9b", MinorType.VARCHAR) - .add("c", MinorType.VARCHAR) - .add("c_2", MinorType.VARCHAR) - .add("c_2_2", MinorType.VARCHAR) - .buildSchema(); - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar", "fourth", "fifth", "sixth") - .build(); - RowSetUtilities.verify(expected, actual); - } - - // Test fix for DRILL-5590 - @Test - public void testCsvHeadersCaseInsensitive() throws IOException { - String sql = "SELECT A, b, C FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .add("b", MinorType.VARCHAR) - .add("C", MinorType.VARCHAR) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar") - .build(); - RowSetUtilities.verify(expected, actual); - } - - private String makeStatement(String fileName) { - return "SELECT * FROM `dfs.data`.`" + fileName + "`"; - } - - private static void buildFile(String fileName, String[] data) throws IOException { - try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) { - for (String line : data) { - out.println(line); - } - } - } - - /** - * Verify that the wildcard expands columns to the header names, including - * case - */ - @Test - public void testWildcard() throws IOException { - String sql = "SELECT * FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .add("b", MinorType.VARCHAR) - .add("c", MinorType.VARCHAR) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar") - .build(); - RowSetUtilities.verify(expected, actual); - } - - /** - * Verify that implicit columns are recognized and populated. Sanity test - * of just one implicit column. - */ - @Test - public void testImplicitColsExplicitSelect() throws IOException { - String sql = "SELECT A, filename FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .addNullable("filename", MinorType.VARCHAR) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", CASE2_FILE_NAME) - .build(); - RowSetUtilities.verify(expected, actual); - } - - /** - * Verify that implicit columns are recognized and populated. Sanity test - * of just one implicit column. - */ - @Test - public void testImplicitColsWildcard() throws IOException { - String sql = "SELECT *, filename FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .add("b", MinorType.VARCHAR) - .add("c", MinorType.VARCHAR) - .addNullable("filename", MinorType.VARCHAR) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar", CASE2_FILE_NAME) - .build(); - RowSetUtilities.verify(expected, actual); - } - - /** - * CSV does not allow explicit use of dir0, dir1, etc. columns. Treated - * as undefined nullable int columns. - * <p> - * Note that the class path storage plugin does not support directories - * (partitions). It is unclear if that should show up here as the - * partition column names being undefined (hence Nullable INT) or should - * they still be defined, but set to a null Nullable VARCHAR? - */ - @Test - public void testPartitionColsWildcard() throws IOException { - String sql = "SELECT *, dir0, dir5 FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .add("b", MinorType.VARCHAR) - .add("c", MinorType.VARCHAR) - .addNullable("dir0", MinorType.INT) - .addNullable("dir5", MinorType.INT) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", "foo", "bar", null, null) - .build(); - RowSetUtilities.verify(expected, actual); - } - - /** - * CSV does not allow explicit use of dir0, dir1, etc. columns. Treated - * as undefined nullable int columns. - */ - @Test - public void testPartitionColsExplicit() throws IOException { - String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`"; - RowSet actual = client.queryBuilder().sql(sql, CASE2_FILE_NAME).rowSet(); - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .addNullable("dir0", MinorType.INT) - .addNullable("dir5", MinorType.INT) - .buildSchema(); - - RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("10", null, null) - .build(); - RowSetUtilities.verify(expected, actual); - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java new file mode 100644 index 000000000..d983f87e3 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant; + +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; +import java.io.File; +import java.io.IOException; + +import org.apache.drill.categories.RowSetTests; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetUtilities; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +// CSV reader now hosted on the row set framework +@Category(RowSetTests.class) +public class TestCsvIgnoreHeaders extends BaseCsvTest{ + + private static String withHeaders[] = { + "a,b,c", + "10,foo,bar", + "20,fred,wilma" + }; + + private static String raggedRows[] = { + "a,b,c", + "10,dino", + "20,foo,bar", + "30" + }; + + @BeforeClass + public static void setup() throws Exception { + BaseCsvTest.setup(true, false); + } + + @Test + public void testColumns() throws IOException { + try { + enableV3(false); + doTestColumns(); + enableV3(true); + doTestColumns(); + } finally { + resetV3(); + } + } + + private void doTestColumns() throws IOException { + String fileName = "simple.csv"; + buildFile(fileName, withHeaders); + String sql = "SELECT columns FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("10", "foo", "bar")) + .addSingleCol(strArray("20", "fred", "wilma")) + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void testRaggedRows() throws IOException { + try { + enableV3(false); + doTestRaggedRows(); + enableV3(true); + doTestRaggedRows(); + } finally { + resetV3(); + } + } + + private void doTestRaggedRows() throws IOException { + String fileName = "ragged.csv"; + TestCsvWithHeaders.buildFile(new File(testDir, fileName), raggedRows); + String sql = "SELECT columns FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("10", "dino")) + .addSingleCol(strArray("20", "foo", "bar")) + .addSingleCol(strArray("30")) + .build(); + RowSetUtilities.verify(expected, actual); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java new file mode 100644 index 000000000..655d04db6 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.drill.categories.RowSetTests; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetUtilities; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Sanity test of CSV files with headers. Tests both the original + * "compliant" version and the V3 version based on the row set + * framework. + * <p> + * The CSV reader is a "canary in the coal mine" for many scan features. + * It turns out that there are several bugs in "V2" (AKA "new text reader") + * that are fixed in "V3" (the one based on the row set framework), and one + * that is not yet fixed. + * + * <ul> + * <li>Ragged rows will crash the V2 text reader when headers are used. + * No V2 test exists as a result. Fixed in V3.</li> + * <li>DRILL-7083: in V2, if files are nested to 2 levels, but we ask + * for dir2 (the non-existent third level), the type of dir2 will be + * nullable INT. In V3, the type is Nullable VARCHAR (just like for the + * existing partition levels.)</li> + * <li>DRILL-7080: A query like SELECT *, dir0 produces the result schema + * of (dir0, a, b, ...) in V2 and (a, b, ... dir0, dir00) in V3. This + * seems to be a bug in the Project operator.</li> + * </ul> + * + * The V3 tests all demonstrate that the row set scan framework + * delivers a first empty batch from each scan. I (Paul) had understood + * that we had an "fast schema" path as the result of the "empty batch" + * project. However, the V2 reader does not provide the schema-only + * first batch. So, not sure if doing so is a feature, or a bug because + * things changed. Easy enough to change if we choose to. If so, the + * tests here would remove the test for that schema-only batch. + * <p> + * Tests are run for both V2 and V3. When the results are the same, + * the test occurs once, wrapped in a "driver" to select V2 or V3 mode. + * When behavior differs, there are separate tests for V2 and V3. + * <p> + * The V2 tests are temporary. Once we accept that V3 is stable, we + * can remove V2 (and the "old text reader.") The behavior in V3 is + * more correct, no reason to keep the old, broken behavior. + * + * @see {@link TestHeaderBuilder} + */ + +// CSV reader now hosted on the row set framework +@Category(RowSetTests.class) +public class TestCsvWithHeaders extends BaseCsvTest { + + private static final String TEST_FILE_NAME = "case2.csv"; + + private static String invalidHeaders[] = { + "$,,9b,c,c,c_2", + "10,foo,bar,fourth,fifth,sixth" + }; + + private static String emptyHeaders[] = { + "", + "10,foo,bar" + }; + + private static String raggedRows[] = { + "a,b,c", + "10,dino", + "20,foo,bar", + "30" + }; + + @BeforeClass + public static void setup() throws Exception { + BaseCsvTest.setup(false, true); + buildFile(TEST_FILE_NAME, validHeaders); + buildNestedTable(); + } + + private static final String EMPTY_FILE = "empty.csv"; + + @Test + public void testEmptyFile() throws IOException { + buildFile(EMPTY_FILE, new String[] {}); + try { + enableV3(false); + doTestEmptyFile(); + enableV3(true); + doTestEmptyFile(); + } finally { + resetV3(); + } + } + + private void doTestEmptyFile() throws IOException { + RowSet rowSet = client.queryBuilder().sql(makeStatement(EMPTY_FILE)).rowSet(); + assertNull(rowSet); + } + + private static final String EMPTY_HEADERS_FILE = "noheaders.csv"; + + /** + * Trivial case: empty header. This case should fail. + */ + + @Test + public void testEmptyCsvHeaders() throws IOException { + buildFile(EMPTY_HEADERS_FILE, emptyHeaders); + try { + enableV3(false); + doTestEmptyCsvHeaders(); + enableV3(true); + doTestEmptyCsvHeaders(); + } finally { + resetV3(); + } + } + + private void doTestEmptyCsvHeaders() throws IOException { + try { + client.queryBuilder().sql(makeStatement(EMPTY_HEADERS_FILE)).run(); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("must define at least one header")); + } + } + + @Test + public void testValidCsvHeaders() throws IOException { + try { + enableV3(false); + doTestValidCsvHeaders(); + enableV3(true); + doTestValidCsvHeaders(); + } finally { + resetV3(); + } + } + + private void doTestValidCsvHeaders() throws IOException { + RowSet actual = client.queryBuilder().sql(makeStatement(TEST_FILE_NAME)).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar") + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void testInvalidCsvHeaders() throws IOException { + try { + enableV3(false); + doTestInvalidCsvHeaders(); + enableV3(true); + doTestInvalidCsvHeaders(); + } finally { + resetV3(); + } + } + + private void doTestInvalidCsvHeaders() throws IOException { + String fileName = "case3.csv"; + buildFile(fileName, invalidHeaders); + RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("column_1", MinorType.VARCHAR) + .add("column_2", MinorType.VARCHAR) + .add("col_9b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .add("c_2", MinorType.VARCHAR) + .add("c_2_2", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", "fourth", "fifth", "sixth") + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void testCsvHeadersCaseInsensitive() throws IOException { + try { + enableV3(false); + doTestCsvHeadersCaseInsensitive(); + enableV3(true); + doTestCsvHeadersCaseInsensitive(); + } finally { + resetV3(); + } + } + + // Test fix for DRILL-5590 + private void doTestCsvHeadersCaseInsensitive() throws IOException { + String sql = "SELECT A, b, C FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar") + .build(); + RowSetUtilities.verify(expected, actual); + } + + private String makeStatement(String fileName) { + return "SELECT * FROM `dfs.data`.`" + fileName + "`"; + } + + @Test + public void testWildcard() throws IOException { + try { + enableV3(false); + doTestWildcard(); + enableV3(true); + doTestWildcard(); + } finally { + resetV3(); + } + } + + /** + * Verify that the wildcard expands columns to the header names, including + * case + */ + private void doTestWildcard() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar") + .build(); + RowSetUtilities.verify(expected, actual); + } + + /** + * Verify that implicit columns are recognized and populated. Sanity test + * of just one implicit column. V2 uses nullable VARCHAR for file + * metadata columns. + */ + + @Test + public void testImplicitColsExplicitSelectV2() throws IOException { + try { + enableV3(false); + String sql = "SELECT A, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .addNullable("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + /** + * Verify that implicit columns are recognized and populated. Sanity test + * of just one implicit column. V3 uses non-nullable VARCHAR for file + * metadata columns. + */ + + @Test + public void testImplicitColsExplicitSelectV3() throws IOException { + try { + enableV3(true); + String sql = "SELECT A, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + /** + * Verify that implicit columns are recognized and populated. Sanity test + * of just one implicit column. V2 uses nullable VARCHAR for file + * metadata columns. + */ + + @Test + public void testImplicitColWildcardV2() throws IOException { + try { + enableV3(false); + String sql = "SELECT *, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + /** + * Verify that implicit columns are recognized and populated. Sanity test + * of just one implicit column. V3 uses non-nullable VARCHAR for file + * metadata columns. + */ + + @Test + public void testImplicitColWildcardV3() throws IOException { + try { + enableV3(true); + String sql = "SELECT *, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .add("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void testColsWithWildcard() throws IOException { + try { + enableV3(false); + doTestColsWithWildcard(); + enableV3(true); + doTestColsWithWildcard(); + } finally { + resetV3(); + } + } + + private void doTestColsWithWildcard() throws IOException { + String sql = "SELECT *, a as d FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", "10") + .build(); + RowSetUtilities.verify(expected, actual); + } + + /** + * V2 does not allow explicit use of dir0, dir1, etc. columns for a non-partitioned + * file. Treated as undefined nullable int columns. + */ + + @Test + public void testPartitionColsExplicitV2() throws IOException { + try { + enableV3(false); + String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .addNullable("dir0", MinorType.INT) + .addNullable("dir5", MinorType.INT) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", null, null) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + /** + * V3 allows the use of partition columns, even for a non-partitioned file. + * The columns are null of type Nullable VARCHAR. This is area of Drill + * is a bit murky: it seems reasonable to support partition columns consistently + * rather than conditionally based on the structure of the input. + */ + @Test + public void testPartitionColsExplicitV3() throws IOException { + try { + enableV3(true); + String sql = "SELECT a, dir0, dir5 FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .addNullable("dir5", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", null, null) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void testDupColumn() throws IOException { + try { + enableV3(false); + doTestDupColumn(); + enableV3(true); + doTestDupColumn(); + } finally { + resetV3(); + } + } + + private void doTestDupColumn() throws IOException { + String sql = "SELECT a, b, a FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("a0", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "10") + .build(); + RowSetUtilities.verify(expected, actual); + } + + // This test cannot be run for V2. The data gets corrupted and we get + // internal errors. + + /** + * Test that ragged rows result in the "missing" columns being filled + * in with the moral equivalent of a null column for CSV: a blank string. + */ + @Test + public void testRaggedRowsV3() throws IOException { + try { + enableV3(true); + String fileName = "case4.csv"; + buildFile(fileName, raggedRows); + RowSet actual = client.queryBuilder().sql(makeStatement(fileName)).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "dino", "") + .addRow("20", "foo", "bar") + .addRow("30", "", "") + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + /** + * Test partition expansion. Because the two files are read in the + * same scan operator, the schema is consistent. See + * {@link TestPartitionRace} for the multi-threaded race where all + * hell breaks loose. + * <p> + * V2, since Drill 1.12, puts partition columns ahead of data columns. + */ + @Test + public void testPartitionExpansionV2() throws IOException { + try { + enableV3(false); + + String sql = "SELECT * FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addNullable("dir0", MinorType.VARCHAR) + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .buildSchema(); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String col2 = reader.scalar(1).getString(); + if (col2.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(null, "10", "foo", "bar") + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(NESTED_DIR, "20", "fred", "wilma") + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test partition expansion in V3. + * <p> + * This test is tricky because it will return two data batches + * (preceded by an empty schema batch.) File read order is random + * so we have to expect the files in either order. + * <p> + * V3, as in V2 before Drill 1.12, puts partition columns after + * data columns (so that data columns don't shift positions if + * files are nested to another level.) + */ + @Test + public void testPartitionExpansionV3() throws IOException { + try { + enableV3(true); + + String sql = "SELECT * FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .buildSchema(); + + // First batch is empty; just carries the schema. + + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + assertEquals(0, rowSet.rowCount()); + rowSet.clear(); + + // Read the other two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String col1 = reader.scalar(0).getString(); + if (col1.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test the use of partition columns with the wildcard. This works for file + * metadata columns, but confuses the project operator when used for + * partition columns. DRILL-7080. + */ + @Test + public void testWilcardAndPartitionsMultiFilesV2() throws IOException { + try { + enableV3(false); + + String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addNullable("dir0", MinorType.VARCHAR) + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir00", MinorType.VARCHAR) + .addNullable("dir1", MinorType.INT) + .buildSchema(); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String aCol = reader.scalar("a").getString(); + if (aCol.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(null, "10", "foo", "bar", null, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(NESTED_DIR, "20", "fred", "wilma", NESTED_DIR, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test the use of partition columns with the wildcard. This works for file + * metadata columns, but confuses the project operator when used for + * partition columns. DRILL-7080. Still broken in V3 because this appears + * to be a Project operator issue, not reader issue. Not that the + * partition column moves after data columns. + */ + @Test + public void testWilcardAndPartitionsMultiFilesV3() throws IOException { + try { + enableV3(true); + + String sql = "SELECT *, dir0, dir1 FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .addNullable("dir1", MinorType.VARCHAR) + .addNullable("dir00", MinorType.VARCHAR) + .addNullable("dir10", MinorType.VARCHAR) + .buildSchema(); + + // First batch is empty; just carries the schema. + + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(), + rowSet); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String aCol = reader.scalar("a").getString(); + if (aCol.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null, null, null, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR, null, NESTED_DIR, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test using partition columns with partitioned files in V2. Since the + * file is nested to one level, dir0 is a nullable VARCHAR, but dir1 is + * a nullable INT. Since both files are read in a single scan operator, + * the schema is consistent. + */ + @Test + public void doTestExplicitPartitionsMultiFilesV2() throws IOException { + try { + enableV3(false); + + String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .addNullable("dir1", MinorType.INT) + .buildSchema(); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String aCol = reader.scalar("a").getString(); + if (aCol.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test using partition columns with partitioned files in V3. Although the + * file is nested to one level, both dir0 and dir1 are nullable VARCHAR. + * See {@link TestPartitionRace} to show that the types and schemas + * are consistent even when used across multiple scans. + */ + @Test + public void doTestExplicitPartitionsMultiFilesV3() throws IOException { + try { + enableV3(true); + + String sql = "SELECT a, b, c, dir0, dir1 FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .addNullable("dir1", MinorType.VARCHAR) + .buildSchema(); + + // First batch is empty; just carries the schema. + + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + RowSetUtilities.verify(new RowSetBuilder(client.allocator(), expectedSchema).build(), + rowSet); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String aCol = reader.scalar("a").getString(); + if (aCol.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR, null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } + finally { + resetV3(); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java new file mode 100644 index 000000000..6051875f2 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.drill.categories.RowSetTests; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ArrayReader; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetUtilities; + +import static org.apache.drill.test.rowSet.RowSetUtilities.strArray; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +// CSV reader now hosted on the row set framework +@Category(RowSetTests.class) +public class TestCsvWithoutHeaders extends BaseCsvTest { + + private static final String TEST_FILE_NAME = "simple.csv"; + + private static String sampleData[] = { + "10,foo,bar", + "20,fred,wilma" + }; + + private static String raggedRows[] = { + "10,dino", + "20,foo,bar", + "30" + }; + + private static String secondSet[] = { + "30,barney,betty" + }; + + @BeforeClass + public static void setup() throws Exception { + BaseCsvTest.setup(false, false); + + buildFile(TEST_FILE_NAME, sampleData); + buildNestedTableWithoutHeaders(); + } + + protected static void buildNestedTableWithoutHeaders() throws IOException { + + // Two-level partitioned table + + File rootDir = new File(testDir, PART_DIR); + rootDir.mkdir(); + buildFile(new File(rootDir, ROOT_FILE), sampleData); + File nestedDir = new File(rootDir, NESTED_DIR); + nestedDir.mkdir(); + buildFile(new File(nestedDir, NESTED_FILE), secondSet); + } + + @Test + public void testWildcard() throws IOException { + try { + enableV3(false); + doTestWildcard(); + enableV3(true); + doTestWildcard(); + } finally { + resetV3(); + } + } + + /** + * Verify that the wildcard expands to the `columns` array + */ + + private void doTestWildcard() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("10", "foo", "bar")) + .addSingleCol(strArray("20", "fred", "wilma")) + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void testColumns() throws IOException { + try { + enableV3(false); + doTestColumns(); + enableV3(true); + doTestColumns(); + } finally { + resetV3(); + } + } + + private void doTestColumns() throws IOException { + String sql = "SELECT columns FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("10", "foo", "bar")) + .addSingleCol(strArray("20", "fred", "wilma")) + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void doTestWildcardAndMetadataV2() throws IOException { + try { + enableV3(false); + String sql = "SELECT *, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .addNullable("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME) + .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void doTestWildcardAndMetadataV3() throws IOException { + try { + enableV3(true); + String sql = "SELECT *, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .add("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME) + .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void testColumnsAndMetadataV2() throws IOException { + try { + enableV3(false); + String sql = "SELECT columns, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .addNullable("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME) + .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void testColumnsAndMetadataV3() throws IOException { + try { + enableV3(true); + String sql = "SELECT columns, filename FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .add("filename", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("10", "foo", "bar"), TEST_FILE_NAME) + .addRow(strArray("20", "fred", "wilma"), TEST_FILE_NAME) + .build(); + RowSetUtilities.verify(expected, actual); + } finally { + resetV3(); + } + } + + @Test + public void testSpecificColumns() throws IOException { + try { + enableV3(false); + doTestSpecificColumns(); + enableV3(true); + doTestSpecificColumns(); + } finally { + resetV3(); + } + } + + private void doTestSpecificColumns() throws IOException { + String sql = "SELECT columns[0], columns[2] FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, TEST_FILE_NAME).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addNullable("EXPR$0", MinorType.VARCHAR) + .addNullable("EXPR$1", MinorType.VARCHAR) + .buildSchema(); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "bar") + .addRow("20", "wilma") + .build(); + RowSetUtilities.verify(expected, actual); + } + + @Test + public void testRaggedRows() throws IOException { + try { + enableV3(false); + doTestRaggedRows(); + enableV3(true); + doTestRaggedRows(); + } finally { + resetV3(); + } + } + + private void doTestRaggedRows() throws IOException { + String fileName = "ragged.csv"; + buildFile(fileName, raggedRows); + String sql = "SELECT columns FROM `dfs.data`.`%s`"; + RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addSingleCol(strArray("10", "dino")) + .addSingleCol(strArray("20", "foo", "bar")) + .addSingleCol(strArray("30")) + .build(); + RowSetUtilities.verify(expected, actual); + } + + /** + * Test partition expansion. Because the two files are read in the + * same scan operator, the schema is consistent. + * <p> + * V2, since Drill 1.12, puts partition columns ahead of data columns. + */ + @Test + public void testPartitionExpansionV2() throws IOException { + try { + enableV3(false); + + String sql = "SELECT * FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addNullable("dir0", MinorType.VARCHAR) + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + + // Read the two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + ArrayReader ar = reader.array(1); + assertTrue(ar.next()); + String col1 = ar.scalar().getString(); + if (col1.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(null, strArray("10", "foo", "bar")) + .addRow(null, strArray("20", "fred", "wilma")) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(NESTED_DIR, strArray("30", "barney", "betty")) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * Test partition expansion in V3. + * <p> + * V3, as in V2 before Drill 1.12, puts partition columns after + * data columns (so that data columns don't shift positions if + * files are nested to another level.) + */ + @Test + public void testPartitionExpansionV3() throws IOException { + try { + enableV3(true); + + String sql = "SELECT * FROM `dfs.data`.`%s`"; + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .buildSchema(); + + // First batch is empty; just carries the schema. + + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + assertEquals(0, rowSet.rowCount()); + rowSet.clear(); + + // Read the other two batches. + + for (int i = 0; i < 2; i++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + ArrayReader ar = reader.array(0); + assertTrue(ar.next()); + String col1 = ar.scalar().getString(); + if (col1.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("10", "foo", "bar"), null) + .addRow(strArray("20", "fred", "wilma"), null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(strArray("30", "barney", "betty"), NESTED_DIR) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java index 20bf79652..67429fbd7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java @@ -21,25 +21,32 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.easy.text.compliant.v3.HeaderBuilder; import org.apache.drill.test.DrillTest; +import org.apache.hadoop.fs.Path; import org.junit.Test; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; +/** + * Test the mechanism that builds column names from a set of CSV + * headers. The mechanism provides reasonable defaults for missing + * or invalid headers. + */ + public class TestHeaderBuilder extends DrillTest { @Test public void testEmptyHeader() { - HeaderBuilder hb = new HeaderBuilder(); - hb.startBatch(); + Path dummyPath = new Path("file:/dummy.csv"); + HeaderBuilder hb = new HeaderBuilder(dummyPath); try { hb.finishRecord(); } catch (UserException e) { assertTrue(e.getMessage().contains("must define at least one header")); } - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,""); try { hb.finishRecord(); @@ -47,127 +54,107 @@ public class TestHeaderBuilder extends DrillTest { assertTrue(e.getMessage().contains("must define at least one header")); } - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb," "); validateHeader(hb, new String[] {"column_1"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,","); validateHeader(hb, new String[] {"column_1", "column_2"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb," , "); validateHeader(hb, new String[] {"column_1", "column_2"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"a, "); validateHeader(hb, new String[] {"a", "column_2"}); } @Test public void testWhiteSpace() { - HeaderBuilder hb = new HeaderBuilder(); - hb.startBatch(); + Path dummyPath = new Path("file:/dummy.csv"); + HeaderBuilder hb = new HeaderBuilder(dummyPath); parse(hb,"a"); validateHeader(hb, new String[] {"a"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb," a "); validateHeader(hb, new String[] {"a"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb," a "); validateHeader(hb, new String[] {"a"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"a,b,c"); validateHeader(hb, new String[] {"a","b","c"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb," a , b , c "); validateHeader(hb, new String[] {"a","b","c"}); } @Test public void testSyntax() { - HeaderBuilder hb = new HeaderBuilder(); - hb.startBatch(); + Path dummyPath = new Path("file:/dummy.csv"); + HeaderBuilder hb = new HeaderBuilder(dummyPath); parse(hb,"a_123"); validateHeader(hb, new String[] {"a_123"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"a_123_"); validateHeader(hb, new String[] {"a_123_"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"az09_"); validateHeader(hb, new String[] {"az09_"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"+"); validateHeader(hb, new String[] {"column_1"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"+,-"); validateHeader(hb, new String[] {"column_1", "column_2"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"+9a"); validateHeader(hb, new String[] {"col_9a"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"9a"); validateHeader(hb, new String[] {"col_9a"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"a+b"); validateHeader(hb, new String[] {"a_b"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"a_b"); validateHeader(hb, new String[] {"a_b"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"EXPR$0"); validateHeader(hb, new String[] {"EXPR_0"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"(_-^-_)"); validateHeader(hb, new String[] {"col_______"}); } @Test public void testUnicode() { - HeaderBuilder hb = new HeaderBuilder(); - hb.startBatch(); + Path dummyPath = new Path("file:/dummy.csv"); + HeaderBuilder hb = new HeaderBuilder(dummyPath); parse(hb,"Αθήνα"); validateHeader(hb, new String[] {"Αθήνα"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"Москва"); validateHeader(hb, new String[] {"Москва"}); - hb = new HeaderBuilder(); - hb.startBatch(); + hb = new HeaderBuilder(dummyPath); parse(hb,"Paris,Αθήνα,Москва"); validateHeader(hb, new String[] {"Paris","Αθήνα","Москва"}); } @@ -183,8 +170,8 @@ public class TestHeaderBuilder extends DrillTest { } private void testParser(String input, String[] expected) { - HeaderBuilder hb = new HeaderBuilder(); - hb.startBatch(); + Path dummyPath = new Path("file:/dummy.csv"); + HeaderBuilder hb = new HeaderBuilder(dummyPath); parse(hb,input); hb.finishRecord(); validateHeader(hb, expected); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java new file mode 100644 index 000000000..6e98339ee --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.drill.test.rowSet.RowSetReader; +import org.apache.drill.test.rowSet.RowSetUtilities; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Demonstrates a race condition inherent in the way that partition + * columns are currently implemented. Two files: one at the root directory, + * one down one level. Parallelization is forced to two. (Most tests use + * small files and both files end up being read in the same scanner, which + * masks the problem shown here.) + * <p> + * Depending on which file is read first, the output row may start with + * or without the partition column. Once the column occurs, it will + * persist. + * <p> + * The solution is to figure out the max partition depth in the + * EasySubScan rather than in each scan operator. + * <p> + * The tests here test both the "V2" (AKA "new text reader") which has + * many issues, and the "V3" (row-set-based version) that has fixes. + * <p> + * See DRILL-7082 for the multi-scan race (fixed in V3), and + * DRILL-7083 for the problem with partition columns returning nullable INT + * (also fixed in V3.) + */ + +public class TestPartitionRace extends BaseCsvTest { + + @BeforeClass + public static void setup() throws Exception { + BaseCsvTest.setup(false, true, 2); + + // Two-level partitioned table + + File rootDir = new File(testDir, PART_DIR); + rootDir.mkdir(); + buildFile(new File(rootDir, "first.csv"), validHeaders); + File nestedDir = new File(rootDir, NESTED_DIR); + nestedDir.mkdir(); + buildFile(new File(nestedDir, "second.csv"), secondFile); + } + + /** + * Oddly, when run in a single fragment, the files occur in a + * stable order, the partition always appars, and it appears in + * the first column position. + */ + @Test + public void testSingleScanV2() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + + try { + enableV3(false); + + // Loop to run the query 10 times, or until we see the race + + boolean sawMissingPartition = false; + boolean sawPartitionFirst = false; + boolean sawPartitionLast = false; + + // Read the two batches. + + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + for (int j = 0; j < 2; j++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Check location of partition column + + int posn = rowSet.schema().index("dir0"); + if (posn == -1) { + sawMissingPartition = true; + } else if (posn == 0) { + sawPartitionFirst = true; + } else { + sawPartitionLast = true; + } + rowSet.clear(); + } + assertFalse(iter.hasNext()); + + // When run in a single fragment, the partition column appears + // all the time, and is in the first column position. + + assertFalse(sawMissingPartition); + assertTrue(sawPartitionFirst); + assertFalse(sawPartitionLast); + } finally { + resetV3(); + client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY); + } + } + + /** + * V3 provides the same schema for the single- and multi-scan + * cases. + */ + @Test + public void testSingleScanV3() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .buildSchema(); + + try { + enableV3(true); + + // Loop to run the query 10 times to verify no race + + // First batch is empty; just carries the schema. + + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + assertEquals(0, rowSet.rowCount()); + rowSet.clear(); + + // Read the two batches. + + for (int j = 0; j < 2; j++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String col1 = reader.scalar("a").getString(); + if (col1.equals("10")) { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + } finally { + resetV3(); + } + } + + /** + * When forced to run in two fragments, the fun really starts. The + * partition column (usually) appears in the last column position instead + * of the first. The partition may or may not occur in the first row + * depending on which file is read first. The result is that the + * other columns will jump around. If we tried to create an expected + * result set, we'd be frustrated because the schema randomly changes. + * <p> + * Just to be clear: this behavior is a bug, not a feature. But, it is + * an established baseline for the "V2" reader. + * <p> + * This is really a test (demonstration) of the wrong behavior. This test + * is pretty unreliable. In particular, the position of the partition column + * seems to randomly shift from first to last position across runs. + */ + @Test + public void testRaceV2() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + + try { + enableV3(false); + + // Special test-only feature to force even small scans + // to use more than one thread. Requires that the max + // parallelization option be set when starting the cluster. + + client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2); + + // Loop to run the query 10 times, or until we see the race + + boolean sawRootFirst = false; + boolean sawNestedFirst = false; + boolean sawMissingPartition = false; + boolean sawPartitionFirst = false; + boolean sawPartitionLast = false; + for (int i = 0; i < 10; i++) { + + // Read the two batches. + + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + for (int j = 0; j < 2; j++) { + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + + // Check location of partition column + + int posn = rowSet.schema().index("dir0"); + if (posn == -1) { + sawMissingPartition = true; + } else if (posn == 0) { + sawPartitionFirst = true; + } else { + sawPartitionLast = true; + } + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String col1 = reader.scalar("a").getString(); + if (col1.equals("10")) { + if (i == 0) { + sawRootFirst = true; + } + } else { + if (i == 0) { + sawNestedFirst = true; + } + } + rowSet.clear(); + } + assertFalse(iter.hasNext()); + if (sawMissingPartition && + sawPartitionFirst && + sawPartitionLast && + sawRootFirst && + sawNestedFirst) { + // The following should appear most of the time. + System.out.println("All variations occurred"); + return; + } + } + + // If you see this, maybe something got fixed. Or, maybe the + // min parallelization hack above stopped working. + // Or, you were just unlucky and can try the test again. + // We print messages, rather than using assertTrue, to avoid + // introducing a flaky test. + + System.out.println("Some variations did not occur"); + System.out.println(String.format("Missing partition: %s", sawMissingPartition)); + System.out.println(String.format("Partition first: %s", sawPartitionFirst)); + System.out.println(String.format("Partition last: %s", sawPartitionLast)); + System.out.println(String.format("Outer first: %s", sawRootFirst)); + System.out.println(String.format("Nested first: %s", sawNestedFirst)); + } finally { + resetV3(); + client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY); + } + } + + /** + * V3 computes partition depth in the group scan (which sees all files), and + * so the partition column count does not vary across scans. Also, V3 puts + * partition columns at the end of the row so that data columns don't + * "jump around" when files are shifted to a new partition depth. + */ + @Test + public void testNoRaceV3() throws IOException { + String sql = "SELECT * FROM `dfs.data`.`%s`"; + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("c", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .buildSchema(); + + try { + enableV3(true); + client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2); + + // Loop to run the query 10 times or until we see both files + // in the first position. + + boolean sawRootFirst = false; + boolean sawNestedFirst = false; + for (int i = 0; i < 10; i++) { + + // First batch is empty; just carries the schema. + + Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator(); + assertTrue(iter.hasNext()); + RowSet rowSet = iter.next(); + assertEquals(0, rowSet.rowCount()); + rowSet.clear(); + + // Read the two batches. + + for (int j = 0; j < 2; j++) { + assertTrue(iter.hasNext()); + rowSet = iter.next(); + + // Figure out which record this is and test accordingly. + + RowSetReader reader = rowSet.reader(); + assertTrue(reader.next()); + String col1 = reader.scalar("a").getString(); + if (col1.equals("10")) { + if (i == 0) { + sawRootFirst = true; + } + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("10", "foo", "bar", null) + .build(); + RowSetUtilities.verify(expected, rowSet); + } else { + if (i == 0) { + sawNestedFirst = true; + } + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("20", "fred", "wilma", NESTED_DIR) + .build(); + RowSetUtilities.verify(expected, rowSet); + } + } + assertFalse(iter.hasNext()); + if (sawRootFirst && + sawNestedFirst) { + // The following should appear most of the time. + System.out.println("Both variations occurred"); + return; + } + } + + // If you see this, maybe something got fixed. Or, maybe the + // min parallelization hack above stopped working. + // Or, you were just unlucky and can try the test again. + // We print messages, rather than using assertTrue, to avoid + // introducing a flaky test. + + System.out.println("Some variations did not occur"); + System.out.println(String.format("Outer first: %s", sawRootFirst)); + System.out.println(String.format("Nested first: %s", sawNestedFirst)); + } finally { + resetV3(); + client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java index 6eb9bbfbe..43ad0d86d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java @@ -61,6 +61,8 @@ public class TestNewTextReader extends BaseTestQuery { fail("Query should have failed"); } catch(UserRemoteException ex) { assertEquals(ErrorType.DATA_READ, ex.getErrorType()); + // Change to the following if V3 is enabled + // assertEquals(ErrorType.VALIDATION, ex.getErrorType()); assertTrue("Error message should contain " + COL_NAME, ex.getMessage().contains(COL_NAME)); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index a9d2977b4..601356e28 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -525,7 +525,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { public static final String EXPLAIN_PLAN_JSON = "json"; public static ClusterFixtureBuilder builder(BaseDirTestWatcher dirTestWatcher) { - ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher) + ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher) .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE); Properties props = new Properties(); props.putAll(ClusterFixture.TEST_CONFIGURATIONS); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 629714b36..63b818e2e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -366,7 +366,7 @@ public class QueryBuilder { } } - public QueryRowSetIterator rowSetIterator( ) { + public QueryRowSetIterator rowSetIterator() { return new QueryRowSetIterator(client.allocator(), withEventListener()); } |