diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
37 files changed, 3153 insertions, 361 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 93adda17c..25ac2c9f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -713,6 +713,14 @@ public final class ExecConstants { public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY, new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files.")); + public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader"; + public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY, + new OptionDescription("Enables the row set based version of the text/csv reader.")); + + public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width"; + public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY, + new OptionDescription("Min width for text readers, mostly for testing.")); + public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java index 84aebdcd0..ebd7288a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java @@ -31,7 +31,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; public abstract class AbstractExchange extends AbstractSingle implements Exchange { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class); // Ephemeral info for generating execution fragments. protected int senderMajorFragmentId; @@ -77,7 +76,7 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang } /** - * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances + * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrences * in given endpoint list. * * @param fragmentEndpoints Drillbit endpoint assignments of fragments. @@ -111,7 +110,6 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang setupSenders(senderLocations); } - @Override public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { this.receiverMajorFragmentId = majorFragmentId; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java index 3bb1e5403..a5229f9d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractFileGroupScan.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.drill.exec.store.dfs.FileSelection; public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class); public AbstractFileGroupScan(String userName) { super(userName); @@ -46,5 +45,4 @@ public abstract class AbstractFileGroupScan extends AbstractGroupScan implements public boolean supportsPartitionFilterPushdown() { return true; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java index d744db424..db4c73d9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java @@ -25,8 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -public abstract class AbstractSubScan extends AbstractBase implements SubScan{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class); +public abstract class AbstractSubScan extends AbstractBase implements SubScan { public AbstractSubScan(String userName) { super(userName); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 063e26bef..1abc3d804 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -499,7 +499,6 @@ public class ScanBatch implements CloseableRecordBatch { schemaChanged = false; } - @SuppressWarnings("resource") private <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz, boolean isImplicitField) throws SchemaChangeException { Map<String, ValueVector> fieldVectorMap; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index c9d97bf72..436aea06b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -42,7 +42,6 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; @@ -77,21 +76,21 @@ import java.util.List; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); + + private static final String EMPTY_STRING = ""; + private Projector projector; private List<ValueVector> allocationVectors; private List<ComplexWriter> complexWriters; private List<FieldReference> complexFieldReferencesList; private boolean hasRemainder = false; - private int remainderIndex = 0; + private int remainderIndex; private int recordCount; - private ProjectMemoryManager memoryManager; - - - private static final String EMPTY_STRING = ""; private boolean first = true; private boolean wasNone = false; // whether a NONE iter outcome was already seen + private ColumnExplorer columnExplorer; private class ClassifierResult { public boolean isStar = false; @@ -114,6 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); + columnExplorer = new ColumnExplorer(context.getOptions()); } @Override @@ -121,14 +121,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { return recordCount; } - @Override protected void killIncoming(final boolean sendUpstream) { super.killIncoming(sendUpstream); hasRemainder = false; } - @Override public IterOutcome innerNext() { if (wasNone) { @@ -145,7 +143,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override public VectorContainer getOutgoingContainer() { - return this.container; + return container; } @Override @@ -204,14 +202,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } incomingRecordCount = incoming.getRecordCount(); memoryManager.update(); - logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}", + logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}", memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); } } } if (complexWriters != null && getLastKnownOutcome() == EMIT) { - throw new UnsupportedOperationException("Currently functions producing complex types as output is not " + + throw new UnsupportedOperationException("Currently functions producing complex types as output are not " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + "function in the projection list of outermost query."); } @@ -219,7 +217,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { first = false; container.zeroVectors(); - int maxOuputRecordCount = memoryManager.getOutputRowCount(); logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}", memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); @@ -233,7 +230,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { long projectEndTime = System.currentTimeMillis(); logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime)); - if (outputRecords < incomingRecordCount) { setValueCount(outputRecords); hasRemainder = true; @@ -277,7 +273,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0); long projectEndTime = System.currentTimeMillis(); - logger.trace("handleRemainder: projection: " + "records {}, time {} ms", projRecords,(projectEndTime - projectStartTime)); + logger.trace("handleRemainder: projection: records {}, time {} ms", projRecords,(projectEndTime - projectStartTime)); if (projRecords < remainingRecordCount) { setValueCount(projRecords); @@ -463,7 +459,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); memoryManager.addNewField(vv, write); - final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); } } continue; @@ -546,7 +542,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); - final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); + cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); memoryManager.addNewField(ouputVector, write); // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. @@ -590,7 +586,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } private boolean isImplicitFileColumn(ValueVector vvIn) { - return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; + return columnExplorer.isImplicitFileColumn(vvIn.getField().getName()); } private List<NamedExpression> getExpressionList() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java index 966a03901..b9b3e782e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java @@ -24,7 +24,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; -import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput; +import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader; /** * Parses the `columns` array. Doing so is surprisingly complex. @@ -113,14 +113,14 @@ public class ColumnsArrayParser implements ScanProjectionParser { if (inCol.isArray()) { int maxIndex = inCol.maxIndex(); - if (maxIndex > RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) { + if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { throw UserException .validationError() .message(String.format( "`columns`[%d] index out of bounds, max supported size is %d", - maxIndex, RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)) + maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS)) .addContext("Column", inCol.name()) - .addContext("Maximum index", RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) + .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS) .build(logger); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java index 8352dfa07..1166a5234 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java @@ -69,6 +69,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File private List<FileSplit> spilts = new ArrayList<>(); private Iterator<FileSplit> splitIter; private Path scanRootDir; + private int partitionDepth; protected DrillFileSystem dfs; private FileMetadataManager metadataManager; @@ -82,12 +83,18 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File /** * Specify the selection root for a directory scan, if any. - * Used to populate partition columns. + * Used to populate partition columns. Also, specify the maximum + * partition depth. + * * @param rootPath Hadoop file path for the directory + * @param partitionDepth maximum partition depth across all files + * within this logical scan operator (files in this scan may be + * shallower) */ - public void setSelectionRoot(Path rootPath) { + public void setSelectionRoot(Path rootPath, int partitionDepth) { this.scanRootDir = rootPath; + this.partitionDepth = partitionDepth; } @Override @@ -122,7 +129,10 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File metadataManager = new FileMetadataManager( context.getFragmentContext().getOptions(), + true, // Expand partition columns with wildcard + false, // Put partition columns after table columns scanRootDir, + partitionDepth, paths); scanOrchestrator.withMetadata(metadataManager); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java index ae8502b52..a4ace55be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java @@ -46,6 +46,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { private boolean hasImplicitCols; + private boolean expandPartitionsAtEnd; + public FileMetadataColumnsParser(FileMetadataManager metadataManager) { this.metadataManager = metadataManager; partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE); @@ -123,8 +125,10 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { } private void buildWildcard() { - if (metadataManager.useLegacyWildcardExpansion && - metadataManager.useLegacyExpansionLocation) { + if (!metadataManager.useLegacyWildcardExpansion) { + return; + } + if (metadataManager.useLegacyExpansionLocation) { // Star column: this is a SELECT * query. @@ -134,6 +138,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // set is constant across all files. expandPartitions(); + } else { + expandPartitionsAtEnd = true; } } @@ -144,8 +150,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // feature to expand partitions for wildcards, and we want the // partitions after data columns. - if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion && - ! metadataManager.useLegacyExpansionLocation) { + if (expandPartitionsAtEnd) { expandPartitions(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java index ba49a9f54..201d30859 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java @@ -41,24 +41,51 @@ import org.apache.hadoop.fs.Path; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +/** + * Manages the insertion of file metadata (AKA "implicit" and partition) columns. + * Parses the file metadata columns from the projection list. Creates and loads + * the vectors that hold the data. If running in legacy mode, inserts partition + * columns when the query contains a wildcard. Supports renaming the columns via + * session options. + * <p> + * The lifecycle is that the manager is given the set of files for this scan + * operator so it can determine the partition depth. (Note that different scans + * may not agree on the depth. This is a known issue with Drill's implementation.) + * <p> + * Then, at the start of the scan, all projection columns are parsed. This class + * picks out the file metadata columns. + * <p> + * On each file (on each reader), the columns are "resolved." Here, that means + * that the columns are filled in with actual values based on the present file. + * <p> + * This is the successor to {@link ColumnExplorer}. + */ + public class FileMetadataManager implements MetadataManager, SchemaProjectionResolver, VectorSource { + /** + * Automatically compute partition depth from files. Use only + * for testing! + */ + + public static final int AUTO_PARTITION_DEPTH = -1; + // Input - private Path scanRootDir; + private final Path scanRootDir; + private final int partitionCount; private FileMetadata currentFile; // Config protected final String partitionDesignator; - protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); - protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); + protected final List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); + protected final Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); /** * Indicates whether to expand partition columns when the query contains a wildcard. * Supports queries such as the following:<code><pre> - * select * from dfs.`partitioned-dir` - * </pre><code> + * select * from dfs.`partitioned-dir`</pre></code> * In which the output columns will be (columns, dir0) if the partitioned directory * has one level of nesting. * @@ -86,7 +113,6 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes private final List<MetadataColumn> metadataColumns = new ArrayList<>(); private ConstantColumnLoader loader; private VectorContainer outputContainer; - private final int partitionCount; /** * Specifies whether to plan based on the legacy meaning of "*". See @@ -111,10 +137,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes public FileMetadataManager(OptionSet optionManager, boolean useLegacyWildcardExpansion, boolean useLegacyExpansionLocation, - Path rootDir, List<Path> files) { + Path rootDir, + int partitionCount, + List<Path> files) { this.useLegacyWildcardExpansion = useLegacyWildcardExpansion; this.useLegacyExpansionLocation = useLegacyExpansionLocation; - scanRootDir = rootDir; partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); for (ImplicitFileColumns e : ImplicitFileColumns.values()) { @@ -129,28 +156,32 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes // The files and root dir are optional. - if (scanRootDir == null || files == null) { - partitionCount = 0; + if (rootDir == null || files == null) { + scanRootDir = null; + this.partitionCount = 0; // Special case in which the file is the same as the // root directory (occurs for a query with only one file.) - } else if (files.size() == 1 && scanRootDir.equals(files.get(0))) { + } else if (files.size() == 1 && rootDir.equals(files.get(0))) { scanRootDir = null; - partitionCount = 0; + this.partitionCount = 0; } else { + scanRootDir = rootDir; - // Compute the partitions. + // Compute the partitions. Normally the count is passed in. + // But, handle the case where the count is unknown. Note: use this + // convenience only in testing since, in production, it can result + // in different scans reporting different numbers of partitions. - partitionCount = computeMaxPartition(files); + if (partitionCount == -1) { + this.partitionCount = computeMaxPartition(files); + } else { + this.partitionCount = partitionCount; + } } } - public FileMetadataManager(OptionSet optionManager, - Path rootDir, List<Path> files) { - this(optionManager, false, false, rootDir, files); - } - private int computeMaxPartition(List<Path> files) { int maxLen = 0; for (Path filePath : files) { @@ -165,6 +196,17 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes this.vectorCache = vectorCache; } + /** + * Returns the file metadata column parser that: + * <ul> + * <li>Picks out the file metadata and partition columns,</li> + * <li>Inserts partition columns for a wildcard query, if the + * option to do so is set.</li> + * </ul> + * + * @see {@link #useLegacyWildcardExpansion} + */ + @Override public ScanProjectionParser projectionParser() { return parser; } @@ -223,6 +265,10 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes currentFile = null; } + /** + * Resolves metadata columns to concrete, materialized columns with the + * proper value for the present file. + */ @Override public boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple, TupleMetadata tableSchema) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java index a5d6ca2a1..926936550 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java @@ -282,7 +282,9 @@ public class ScanSchemaOrchestrator { ScanProjectionParser parser = metadataManager.projectionParser(); if (parser != null) { - parsers.add(parser); + // Insert in first position so that it is ensured to see + // any wildcard that exists + parsers.add(0, parser); } // Parse the projection list. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java index f109578e9..be45d569e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java @@ -87,17 +87,17 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer { "width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth()); // 1.5 Cap the parallelization width by max allowed parallelization per node - width = Math.max(1, Math.min(width, endpointPool.size()*parameters.getMaxWidthPerNode())); + width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode())); - // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if we the width is more, - // we end up allocating more work units to one or more endpoints that don't have those many work units. + // 1.6 Cap the parallelization width by total of max allowed width per node. The reason is if the width is more, + // we end up allocating more work units to one or more endpoints that don't have that many work units. width = Math.min(totalMaxWidth, width); // Step 2: Select the endpoints final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap(); // 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled. - for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) { + for (Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) { endpoints.put(entry.getKey(), 1); } int totalAssigned = endpoints.size(); @@ -105,15 +105,15 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer { // 2.2 Assign the remaining slots to endpoints proportional to the affinity of each endpoint int remainingSlots = width - endpoints.size(); while (remainingSlots > 0) { - for(EndpointAffinity epAf : endpointPool.values()) { + for (EndpointAffinity epAf : endpointPool.values()) { final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * remainingSlots); int currentAssignments = endpoints.get(epAf.getEndpoint()); - for(int i=0; - i < moreAllocation && + for (int i=0; + i < moreAllocation && totalAssigned < width && currentAssignments < parameters.getMaxWidthPerNode() && currentAssignments < epAf.getMaxWidth(); - i++) { + i++) { totalAssigned++; currentAssignments++; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 8de57bca0..39b699fe4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -179,7 +179,7 @@ public class SimpleParallelizer implements ParallelizationParameters { public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) { planningSet.get(rootFragment); - for(ExchangeFragmentPair fragmentPair : rootFragment) { + for (ExchangeFragmentPair fragmentPair : rootFragment) { initFragmentWrappers(fragmentPair.getNode(), planningSet); } } @@ -193,7 +193,7 @@ public class SimpleParallelizer implements ParallelizationParameters { private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. - for(Wrapper currentFragmentWrapper : planningSet) { + for (Wrapper currentFragmentWrapper : planningSet) { ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); if (sendingExchange != null) { ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); @@ -209,17 +209,17 @@ public class SimpleParallelizer implements ParallelizationParameters { // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and - // remove the fragment on which the current fragment depends on. + // remove the fragment on which the current fragment depends. final Set<Wrapper> roots = Sets.newHashSet(); - for(Wrapper w : planningSet) { + for (Wrapper w : planningSet) { roots.add(w); } - for(Wrapper wrapper : planningSet) { + for (Wrapper wrapper : planningSet) { final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { - for(Wrapper dependency : fragmentDependencies) { + for (Wrapper dependency : fragmentDependencies) { if (roots.contains(dependency)) { roots.remove(dependency); } @@ -241,7 +241,7 @@ public class SimpleParallelizer implements ParallelizationParameters { return; } - // First parallelize fragments on which this fragment depends on. + // First parallelize fragments on which this fragment depends. final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { for(Wrapper dependency : fragmentDependencies) { @@ -288,20 +288,20 @@ public class SimpleParallelizer implements ParallelizationParameters { Preconditions.checkArgument(op instanceof FragmentRoot); FragmentRoot root = (FragmentRoot) op; - FragmentHandle handle = FragmentHandle // - .newBuilder() // - .setMajorFragmentId(wrapper.getMajorFragmentId()) // - .setMinorFragmentId(minorFragmentId) // - .setQueryId(queryId) // + FragmentHandle handle = FragmentHandle + .newBuilder() + .setMajorFragmentId(wrapper.getMajorFragmentId()) + .setMinorFragmentId(minorFragmentId) + .setQueryId(queryId) .build(); - PlanFragment fragment = PlanFragment.newBuilder() // - .setForeman(foremanNode) // - .setHandle(handle) // - .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) // - .setLeafFragment(isLeafFragment) // + PlanFragment fragment = PlanFragment.newBuilder() + .setForeman(foremanNode) + .setHandle(handle) + .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) + .setLeafFragment(isLeafFragment) .setContext(queryContextInfo) - .setMemInitial(wrapper.getInitialAllocation())// + .setMemInitial(wrapper.getInitialAllocation()) .setMemMax(wrapper.getMaxAllocation()) .setCredentials(session.getCredentials()) .addAllCollector(CountRequiredFragments.getCollectors(root)) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 760a6ba5b..a07d3ed58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -111,7 +111,7 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA final ScanStats stats = this.getGroupScan().getScanStats(settings); final int columnCount = this.getRowType().getFieldCount(); - if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { + if (PrelUtil.getSettings(getCluster()).useDefaultCosting()) { return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index b4fada61f..4d702a830 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -30,8 +30,8 @@ public class ScanPrule extends Prule{ public ScanPrule() { super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule"); - } + @Override public void onMatch(RelOptRuleCall call) { final DrillScanRel scan = (DrillScanRel) call.rel(0); @@ -48,5 +48,4 @@ public class ScanPrule extends Prule{ call.transformTo(newScan); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java index fa8e69d0b..267cecd4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java @@ -159,13 +159,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive class MajorFragmentStat { private DistributionAffinity distributionAffinity = DistributionAffinity.NONE; - private double maxRows = 0d; + private double maxRows; private int maxWidth = Integer.MAX_VALUE; - private boolean isMultiSubScan = false; - private boolean rightSideOfLateral = false; + private boolean isMultiSubScan; + private boolean rightSideOfLateral; //This flag if true signifies that all the Rels thus far //are simple rels with no distribution requirement. - private boolean isSimpleRel = false; + private boolean isSimpleRel; public void add(Prel prel) { maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows); @@ -196,7 +196,7 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive return false; } - int suggestedWidth = (int) Math.ceil((maxRows+1)/targetSliceSize); + int suggestedWidth = (int) Math.ceil((maxRows + 1) / targetSliceSize); int w = Math.min(maxWidth, suggestedWidth); if (w < 1) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 9cd670e99..e201e2686 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -242,6 +242,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR), new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR), new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER), + new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER), + new OptionDefinition(ExecConstants.MIN_READER_WIDTH), new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST), new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE), new OptionDefinition(ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index 33b500018..1ae2d5e9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -55,14 +55,30 @@ public class ColumnExplorer { */ public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) { this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); - this.columns = columns; - this.isStarQuery = columns != null && Utilities.isStarQuery(columns); this.selectedPartitionColumns = Lists.newArrayList(); this.tableColumns = Lists.newArrayList(); this.allImplicitColumns = initImplicitFileColumns(optionManager); this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap(); + if (columns == null) { + isStarQuery = false; + this.columns = null; + } else { + this.columns = columns; + this.isStarQuery = Utilities.isStarQuery(columns); + init(); + } + } - init(); + /** + * Constructor for using the column explorer to probe existing columns in the + * {@link ProjectRecordBatch}. + */ + // TODO: This is awkward. This class is being used for two distinct things: + // 1. The definition of the metadata columns, and + // 2. The projection of metadata columns in a particular query. + // Would be better to separate these two concepts. + public ColumnExplorer(OptionManager optionManager) { + this(optionManager, null); } /** @@ -123,6 +139,15 @@ public class ColumnExplorer { return matcher.matches(); } + public boolean isImplicitColumn(String name) { + return isPartitionColumn(partitionDesignator, name) || + isImplicitFileColumn(name); + } + + public boolean isImplicitFileColumn(String name) { + return allImplicitColumns.get(name) != null; + } + /** * Returns list with partition column names. * For the case when table has several levels of nesting, max level is chosen. @@ -132,10 +157,24 @@ public class ColumnExplorer { * @return list with partition column names. */ public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) { - int partitionsCount = 0; + int partitionsCount = getPartitionDepth(selection); + + String partitionColumnLabel = schemaConfig.getOption( + ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + List<String> partitions = new ArrayList<>(); + + // generates partition column names: dir0, dir1 etc. + for (int i = 0; i < partitionsCount; i++) { + partitions.add(partitionColumnLabel + i); + } + return partitions; + } + + public static int getPartitionDepth(FileSelection selection) { // a depth of table root path int rootDepth = selection.getSelectionRoot().depth(); + int partitionsCount = 0; for (Path file : selection.getFiles()) { // Calculates partitions count for the concrete file: // depth of file path - depth of table root path - 1. @@ -145,15 +184,7 @@ public class ColumnExplorer { // max depth of files path should be used to handle all partitions partitionsCount = Math.max(partitionsCount, currentPartitionsCount); } - - String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List<String> partitions = new ArrayList<>(); - - // generates partition column names: dir0, dir1 etc. - for (int i = 0; i < partitionsCount; i++) { - partitions.add(partitionColumnLabel + i); - } - return partitions; + return partitionsCount; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 13017fadc..254cddbe3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -73,5 +73,4 @@ public interface FormatPlugin { Configuration getFsConf(); DrillbitContext getContext(); String getName(); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index d76c6489e..dc1f053a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.drill.shaded.guava.com.google.common.base.Functions; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; @@ -39,10 +40,16 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch; +import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; +import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec; +import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; @@ -55,67 +62,335 @@ import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +/** + * Base class for various file readers. + * <p> + * This version provides a bridge between the legacy {@link RecordReader}-style + * readers and the newer {@link FileBatchReader} style. Over time, split the + * class, or provide a cleaner way to handle the differences. + * + * @param <T> the format plugin config for this reader + */ + public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin { - @SuppressWarnings("unused") - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); + /** + * Defines the static, programmer-defined options for this plugin. These + * options are attributes of how the plugin works. The plugin config, + * defined in the class definition, provides user-defined options that can + * vary across uses of the plugin. + */ + + public static class EasyFormatConfig { + public BasicFormatMatcher matcher; + public boolean readable = true; + public boolean writable; + public boolean blockSplittable; + public boolean compressible; + public Configuration fsConf; + public List<String> extensions; + public String defaultName; + + // Config options that, prior to Drill 1.15, required the plugin to + // override methods. Moving forward, plugins should be migrated to + // use this simpler form. New plugins should use these options + // instead of overriding methods. + + public boolean supportsProjectPushdown; + public boolean supportsAutoPartitioning; + public int readerOperatorType = -1; + public int writerOperatorType = -1; + } + + /** + * Creates the scan batch to use with the plugin. Drill supports the "classic" + * style of scan batch and readers, along with the newer size-aware, + * component-based version. The implementation of this class assembles the + * readers and scan batch operator as needed for each version. + */ + + public interface ScanBatchCreator { + CloseableRecordBatch buildScan( + final FragmentContext context, EasySubScan scan) + throws ExecutionSetupException; + } + + /** + * Use the original scanner based on the {@link RecordReader} interface. + * Requires that the storage plugin roll its own solutions for null columns. + * Is not able to limit vector or batch sizes. Retained or backward + * compatibility with "classic" format plugins which have not yet been + * upgraded to use the new framework. + */ + + public static class ClassicScanBatchCreator implements ScanBatchCreator { + + private final EasyFormatPlugin<? extends FormatPluginConfig> plugin; + + public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) { + this.plugin = plugin; + } + + @Override + public CloseableRecordBatch buildScan( + final FragmentContext context, EasySubScan scan) throws ExecutionSetupException { + final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); + + if (! columnExplorer.isStarQuery()) { + scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), + columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth()); + scan.setOperatorId(scan.getOperatorId()); + } + + final OperatorContext oContext = context.newOperatorContext(scan); + final DrillFileSystem dfs; + try { + dfs = oContext.newFileSystem(plugin.easyConfig().fsConf); + } catch (final IOException e) { + throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); + } + + final List<RecordReader> readers = new LinkedList<>(); + final List<Map<String, String>> implicitColumns = Lists.newArrayList(); + Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); + final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; + for (final FileWork work : scan.getWorkUnits()) { + final RecordReader recordReader = getRecordReader( + plugin, context, dfs, work, scan.getColumns(), scan.getUserName()); + readers.add(recordReader); + final List<String> partitionValues = ColumnExplorer.listPartitionValues( + work.getPath(), scan.getSelectionRoot(), false); + final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns( + work.getPath(), partitionValues, supportsFileImplicitColumns); + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; + } + } + + // all readers should have the same number of implicit columns, add missing ones with value null + final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (final Map<String, String> map : implicitColumns) { + map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); + } + + return new ScanBatch(context, oContext, readers, implicitColumns); + } + + /** + * Create a record reader given a file system, a file description and other + * information. For backward compatibility, calls the plugin method by + * default. + * + * @param plugin + * the plugin creating the scan + * @param context + * fragment context for the fragment running the scan + * @param dfs + * Drill's distributed file system facade + * @param fileWork + * description of the file to scan + * @param columns + * list of columns to project + * @param userName + * the name of the user performing the scan + * @return a scan operator + * @throws ExecutionSetupException + * if anything goes wrong + */ + + public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin, + FragmentContext context, DrillFileSystem dfs, FileWork fileWork, + List<SchemaPath> columns, String userName) throws ExecutionSetupException { + return plugin.getRecordReader(context, dfs, fileWork, columns, userName); + } + } + + /** + * Revised scanner based on the revised + * {@link ResultSetLoader} and {@link RowBatchReader} classes. + * Handles most projection tasks automatically. Able to limit + * vector and batch sizes. Use this for new format plugins. + */ + + public abstract static class ScanFrameworkCreator + implements ScanBatchCreator { + + protected EasyFormatPlugin<? extends FormatPluginConfig> plugin; + + public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) { + this.plugin = plugin; + } + + /** + * Builds the revised {@link FileBatchReader}-based scan batch. + * + * @param context + * @param scan + * @return + * @throws ExecutionSetupException + */ + + @Override + public CloseableRecordBatch buildScan( + final FragmentContext context, + final EasySubScan scan) throws ExecutionSetupException { + + // Assemble the scan operator and its wrapper. + + try { + final BaseFileScanFramework<?> framework = buildFramework(scan); + final Path selectionRoot = scan.getSelectionRoot(); + if (selectionRoot != null) { + framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth()); + } + return new OperatorRecordBatch( + context, scan, + new ScanOperatorExec( + framework)); + } catch (final UserException e) { + // Rethrow user exceptions directly + throw e; + } catch (final Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + + /** + * Create the plugin-specific framework that manages the scan. The framework + * creates batch readers one by one for each file or block. It defines semantic + * rules for projection. It handles "early" or "late" schema readers. A typical + * framework builds on standardized frameworks for files in general or text + * files in particular. + * + * @param scan the physical operation definition for the scan operation. Contains + * one or more files to read. (The Easy format plugin works only for files.) + * @return the scan framework which orchestrates the scan operation across + * potentially many files + * @throws ExecutionSetupException for all setup failures + */ + protected abstract BaseFileScanFramework<?> buildFramework( + EasySubScan scan) throws ExecutionSetupException; + } + + /** + * Generic framework creator for files that just use the basic file + * support: metadata, etc. Specialized use cases (special "columns" + * column, say) will require a specialized implementation. + */ + + public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator { + + private final FileReaderFactory readerCreator; + + public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin, + FileReaderFactory readerCreator) { + super(plugin); + this.readerCreator = readerCreator; + } + + @Override + protected FileScanFramework buildFramework( + EasySubScan scan) throws ExecutionSetupException { - private final BasicFormatMatcher matcher; + final FileScanFramework framework = new FileScanFramework( + scan.getColumns(), + scan.getWorkUnits(), + plugin.easyConfig().fsConf, + readerCreator); + return framework; + } + } + + private final String name; + private final EasyFormatConfig easyConfig; private final DrillbitContext context; - private final boolean readable; - private final boolean writable; - private final boolean blockSplittable; - private final Configuration fsConf; private final StoragePluginConfig storageConfig; protected final T formatConfig; - private final String name; - private final boolean compressible; + /** + * Legacy constructor. + */ protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf, - StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable, - boolean compressible, List<String> extensions, String defaultName){ - this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible); - this.readable = readable; - this.writable = writable; + StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, + boolean blockSplittable, + boolean compressible, List<String> extensions, String defaultName) { + this.name = name == null ? defaultName : name; + easyConfig = new EasyFormatConfig(); + easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible); + easyConfig.readable = readable; + easyConfig.writable = writable; this.context = context; - this.blockSplittable = blockSplittable; - this.compressible = compressible; - this.fsConf = fsConf; + easyConfig.blockSplittable = blockSplittable; + easyConfig.compressible = compressible; + easyConfig.fsConf = fsConf; this.storageConfig = storageConfig; this.formatConfig = formatConfig; - this.name = name == null ? defaultName : name; } - @Override - public Configuration getFsConf() { - return fsConf; + /** + * Revised constructor in which settings are gathered into a configuration object. + * + * @param name name of the plugin + * @param config configuration options for this plugin which determine + * developer-defined runtime behavior + * @param context the global server-wide drillbit context + * @param storageConfig the configuration for the storage plugin that owns this + * format plugin + * @param formatConfig the Jackson-serialized format configuration as created + * by the user in the Drill web console. Holds user-defined options. + */ + + protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context, + StoragePluginConfig storageConfig, T formatConfig) { + this.name = name; + this.easyConfig = config; + this.context = context; + this.storageConfig = storageConfig; + this.formatConfig = formatConfig; + if (easyConfig.matcher == null) { + easyConfig.matcher = new BasicFormatMatcher(this, + easyConfig.fsConf, easyConfig.extensions, + easyConfig.compressible); + } } @Override - public DrillbitContext getContext() { - return context; - } + public Configuration getFsConf() { return easyConfig.fsConf; } @Override - public String getName() { - return name; - } + public DrillbitContext getContext() { return context; } - public abstract boolean supportsPushDown(); + public EasyFormatConfig easyConfig() { return easyConfig; } + + @Override + public String getName() { return name; } /** - * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will - * only split on file boundaries. + * Does this plugin support projection push down? That is, can the reader + * itself handle the tasks of projecting table columns, creating null + * columns for missing table columns, and so on? * - * @return True if splittable. + * @return <tt>true</tt> if the plugin supports projection push-down, + * <tt>false</tt> if Drill should do the task by adding a project operator */ - public boolean isBlockSplittable() { - return blockSplittable; - } + + public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; } + + /** + * Whether or not you can split the format based on blocks within file + * boundaries. If not, the simple format engine will only split on file + * boundaries. + * + * @return <code>true</code> if splittable. + */ + public boolean isBlockSplittable() { return easyConfig.blockSplittable; } /** * Indicates whether or not this format could also be in a compression @@ -125,52 +400,40 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements * * @return <code>true</code> if it is compressible */ - public boolean isCompressible() { - return compressible; - } - - public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List<SchemaPath> columns, String userName) throws ExecutionSetupException; + public boolean isCompressible() { return easyConfig.compressible; } - CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); - - if (!columnExplorer.isStarQuery()) { - scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), - columnExplorer.getTableColumns(), scan.getSelectionRoot()); - scan.setOperatorId(scan.getOperatorId()); - } + /** + * Return a record reader for the specific file format, when using the original + * {@link ScanBatch} scanner. + * @param context fragment context + * @param dfs Drill file system + * @param fileWork metadata about the file to be scanned + * @param columns list of projected columns (or may just contain the wildcard) + * @param userName the name of the user running the query + * @return a record reader for this format + * @throws ExecutionSetupException for many reasons + */ - OperatorContext oContext = context.newOperatorContext(scan); - final DrillFileSystem dfs; - try { - dfs = oContext.newFileSystem(fsConf); - } catch (IOException e) { - throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); - } + public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, + List<SchemaPath> columns, String userName) throws ExecutionSetupException { + throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner."); + } - List<RecordReader> readers = new LinkedList<>(); - List<Map<String, String>> implicitColumns = Lists.newArrayList(); - Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); - boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; - for (FileWork work : scan.getWorkUnits()){ - RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); - readers.add(recordReader); - List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false); - Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns); - implicitColumns.add(implicitValues); - if (implicitValues.size() > mapWithMaxColumns.size()) { - mapWithMaxColumns = implicitValues; - } - } + protected CloseableRecordBatch getReaderBatch(final FragmentContext context, + final EasySubScan scan) throws ExecutionSetupException { + return scanBatchCreator(context.getOptions()).buildScan(context, scan); + } - // all readers should have the same number of implicit columns, add missing ones with value null - Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); - for (Map<String, String> map : implicitColumns) { - map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); - } + /** + * Create the scan batch creator. Needed only when using the revised scan batch. In that + * case, override the <tt>readerIterator()</tt> method on the custom scan batch + * creator implementation. + * + * @return the strategy for creating the scan batch for this plugin + */ - return new ScanBatch(context, oContext, readers, implicitColumns); + protected ScanBatchCreator scanBatchCreator(OptionManager options) { + return new ClassicScanBatchCreator(this); } public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) { @@ -219,41 +482,28 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } @Override - public T getConfig() { - return formatConfig; - } + public T getConfig() { return formatConfig; } @Override - public StoragePluginConfig getStorageConfig() { - return storageConfig; - } + public StoragePluginConfig getStorageConfig() { return storageConfig; } @Override - public boolean supportsRead() { - return readable; - } + public boolean supportsRead() { return easyConfig.readable; } @Override - public boolean supportsWrite() { - return writable; - } + public boolean supportsWrite() { return easyConfig.writable; } @Override - public boolean supportsAutoPartitioning() { - return false; - } + public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; } @Override - public FormatMatcher getMatcher() { - return matcher; - } + public FormatMatcher getMatcher() { return easyConfig.matcher; } @Override public Set<StoragePluginOptimizerRule> getOptimizerRules() { return ImmutableSet.of(); } - public abstract int getReaderOperatorType(); - public abstract int getWriterOperatorType(); - + public int getReaderOperatorType() { return easyConfig.readerOperatorType; } + public int getWriterOperatorType() { return easyConfig.writerOperatorType; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 4449ec054..6a6243cd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; @@ -57,9 +58,11 @@ import org.apache.hadoop.fs.Path; public class EasyGroupScan extends AbstractFileGroupScan { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class); - private FileSelection selection; private final EasyFormatPlugin<?> formatPlugin; + private FileSelection selection; + private int partitionDepth; private int maxWidth; + private int minWidth = 1; private List<SchemaPath> columns; private ListMultimap<Integer, CompleteFileWork> mappings; @@ -104,9 +107,23 @@ public class EasyGroupScan extends AbstractFileGroupScan { initFromSelection(selection, formatPlugin); } - @JsonIgnore - public Iterable<CompleteFileWork> getWorkIterable() { - return () -> Iterators.unmodifiableIterator(chunks.iterator()); + public EasyGroupScan( + String userName, + FileSelection selection, + EasyFormatPlugin<?> formatPlugin, + List<SchemaPath> columns, + Path selectionRoot, + int minWidth + ) throws IOException{ + this(userName, selection, formatPlugin, columns, selectionRoot); + + // Set the minimum width of this reader. Primarily used for testing + // to force parallelism even for small test files. + // See ExecConstants.MIN_READER_WIDTH + this.minWidth = Math.max(1, Math.min(minWidth, maxWidth)); + + // Compute the maximum partition depth across all files. + partitionDepth = ColumnExplorer.getPartitionDepth(selection); } private EasyGroupScan(final EasyGroupScan that) { @@ -118,17 +135,23 @@ public class EasyGroupScan extends AbstractFileGroupScan { chunks = that.chunks; endpointAffinities = that.endpointAffinities; maxWidth = that.maxWidth; + minWidth = that.minWidth; mappings = that.mappings; + partitionDepth = that.partitionDepth; + } + + @JsonIgnore + public Iterable<CompleteFileWork> getWorkIterable() { + return () -> Iterators.unmodifiableIterator(chunks.iterator()); } private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException { - @SuppressWarnings("resource") final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf()); this.selection = selection; BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable()); - this.maxWidth = chunks.size(); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable()); + maxWidth = chunks.size(); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); } public Path getSelectionRoot() { @@ -136,11 +159,16 @@ public class EasyGroupScan extends AbstractFileGroupScan { } @Override + @JsonIgnore + public int getMinParallelizationWidth() { + return minWidth; + } + + @Override public int getMaxParallelizationWidth() { return maxWidth; } - @Override public ScanStats getScanStats(final PlannerSettings settings) { return formatPlugin.getScanStats(settings, this); @@ -163,7 +191,6 @@ public class EasyGroupScan extends AbstractFileGroupScan { return columns; } - @JsonIgnore public FileSelection getFileSelection() { return selection; @@ -180,7 +207,6 @@ public class EasyGroupScan extends AbstractFileGroupScan { return new EasyGroupScan(this); } - @Override public List<EndpointAffinity> getOperatorAffinity() { if (endpointAffinities == null) { @@ -217,7 +243,8 @@ public class EasyGroupScan extends AbstractFileGroupScan { Preconditions.checkArgument(!filesForMinor.isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); - EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot); + EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, + columns, selectionRoot, partitionDepth); subScan.setOperatorId(this.getOperatorId()); return subScan; } @@ -275,5 +302,4 @@ public class EasyGroupScan extends AbstractFileGroupScan { public boolean canPushdownProjects(List<SchemaPath> columns) { return formatPlugin.supportsPushDown(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java index fbb3f475c..c51c7ac0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java @@ -42,7 +42,8 @@ public class EasySubScan extends AbstractSubScan{ private final List<FileWorkImpl> files; private final EasyFormatPlugin<?> formatPlugin; private final List<SchemaPath> columns; - private Path selectionRoot; + private final Path selectionRoot; + private final int partitionDepth; @JsonCreator public EasySubScan( @@ -52,7 +53,8 @@ public class EasySubScan extends AbstractSubScan{ @JsonProperty("format") FormatPluginConfig formatConfig, @JacksonInject StoragePluginRegistry engineRegistry, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("selectionRoot") Path selectionRoot + @JsonProperty("selectionRoot") Path selectionRoot, + @JsonProperty("partitionDepth") int partitionDepth ) throws ExecutionSetupException { super(userName); this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); @@ -60,50 +62,40 @@ public class EasySubScan extends AbstractSubScan{ this.files = files; this.columns = columns; this.selectionRoot = selectionRoot; + this.partitionDepth = partitionDepth; } - public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, - Path selectionRoot){ + public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, + List<SchemaPath> columns, Path selectionRoot, int partitionDepth) { super(userName); this.formatPlugin = plugin; this.files = files; this.columns = columns; this.selectionRoot = selectionRoot; + this.partitionDepth = partitionDepth; } @JsonProperty - public Path getSelectionRoot() { - return selectionRoot; - } + public Path getSelectionRoot() { return selectionRoot; } + + @JsonProperty + public int getPartitionDepth() { return partitionDepth; } @JsonIgnore - public EasyFormatPlugin<?> getFormatPlugin(){ - return formatPlugin; - } + public EasyFormatPlugin<?> getFormatPlugin() { return formatPlugin; } @JsonProperty("files") - public List<FileWorkImpl> getWorkUnits() { - return files; - } + public List<FileWorkImpl> getWorkUnits() { return files; } @JsonProperty("storage") - public StoragePluginConfig getStorageConfig(){ - return formatPlugin.getStorageConfig(); - } + public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); } @JsonProperty("format") - public FormatPluginConfig getFormatConfig(){ - return formatPlugin.getConfig(); - } + public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); } @JsonProperty("columns") - public List<SchemaPath> getColumns(){ - return columns; - } + public List<SchemaPath> getColumns() { return columns; } @Override - public int getOperatorType() { - return formatPlugin.getReaderOperatorType(); - } - + public int getOperatorType() { return formatPlugin.getReaderOperatorType(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 03ae6f554..05ed1b564 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -18,34 +18,45 @@ package org.apache.drill.exec.store.easy.text; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.FileReaderCreator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; +import org.apache.drill.exec.store.dfs.easy.EasySubScan; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader; import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings; +import org.apache.drill.exec.store.easy.text.compliant.v3.CompliantTextBatchReader; +import org.apache.drill.exec.store.easy.text.compliant.v3.TextParsingSettingsV3; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.drill.exec.store.text.DrillTextRecordReader; import org.apache.drill.exec.store.text.DrillTextRecordWriter; @@ -61,91 +72,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> { - private final static String DEFAULT_NAME = "text"; +public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> + implements FileReaderCreator { + private final static String PLUGIN_NAME = "text"; - public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { - super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true, - Collections.emptyList(), DEFAULT_NAME); - } - - public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, - TextFormatConfig formatPluginConfig) { - super(name, context, fsConf, config, formatPluginConfig, true, false, true, true, - formatPluginConfig.getExtensions(), DEFAULT_NAME); - } - - - @Override - public RecordReader getRecordReader(FragmentContext context, - DrillFileSystem dfs, - FileWork fileWork, - List<SchemaPath> columns, - String userName) { - Path path = dfs.makeQualified(fileWork.getPath()); - FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); - - if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { - TextParsingSettings settings = new TextParsingSettings(); - settings.set(formatConfig); - return new CompliantTextRecordReader(split, dfs, settings, columns); - } else { - char delim = formatConfig.getFieldDelimiter(); - return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns); - } - } - - @Override - public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) - throws IOException { - return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot); - } - - @Override - public boolean supportsStatistics() { - return false; - } - - @Override - public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { - throw new UnsupportedOperationException("unimplemented"); - } - - @Override - public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { - throw new UnsupportedOperationException("unimplemented"); - } - - @Override - protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) { - long data = 0; - for (final CompleteFileWork work : scan.getWorkIterable()) { - data += work.getTotalBytes(); - } - final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE); - final double estRowCount = data / estimatedRowSize; - return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data); - } - - @Override - public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { - final Map<String, String> options = new HashMap<>(); - - options.put("location", writer.getLocation()); - FragmentHandle handle = context.getHandle(); - String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); - options.put("prefix", fragmentId); - options.put("separator", getConfig().getFieldDelimiterAsString()); - options.put("extension", getConfig().getExtensions().get(0)); - - RecordWriter recordWriter = new DrillTextRecordWriter( - context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); - recordWriter.init(options); - - return recordWriter; - } - - @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT) + @JsonTypeName(PLUGIN_NAME) + @JsonInclude(Include.NON_DEFAULT) public static class TextFormatConfig implements FormatPluginConfig { public List<String> extensions = ImmutableList.of(); @@ -157,34 +89,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm public boolean skipFirstLine = false; public boolean extractHeader = false; - public List<String> getExtensions() { - return extensions; - } - - public char getQuote() { - return quote; - } - - public char getEscape() { - return escape; - } + public TextFormatConfig() { } - public char getComment() { - return comment; - } - - public String getLineDelimiter() { - return lineDelimiter; - } - - public char getFieldDelimiter() { - return fieldDelimiter; - } + public List<String> getExtensions() { return extensions; } + public char getQuote() { return quote; } + public char getEscape() { return escape; } + public char getComment() { return comment; } + public String getLineDelimiter() { return lineDelimiter; } + public char getFieldDelimiter() { return fieldDelimiter; } + public boolean isSkipFirstLine() { return skipFirstLine; } @JsonIgnore - public boolean isHeaderExtractionEnabled() { - return extractHeader; - } + public boolean isHeaderExtractionEnabled() { return extractHeader; } @JsonIgnore public String getFieldDelimiterAsString(){ @@ -197,10 +113,6 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm this.fieldDelimiter = delimiter; } - public boolean isSkipFirstLine() { - return skipFirstLine; - } - @Override public int hashCode() { final int prime = 31; @@ -262,24 +174,173 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm } return true; } + } + + public static class TextScanBatchCreator extends ScanFrameworkCreator { + + private final FileReaderCreator readerCreator; + private final TextFormatPlugin textPlugin; + + public TextScanBatchCreator(TextFormatPlugin plugin, + FileReaderCreator readerCreator) { + super(plugin); + this.readerCreator = readerCreator; + textPlugin = plugin; + } + + @Override + protected ColumnsScanFramework buildFramework( + EasySubScan scan) throws ExecutionSetupException { + ColumnsScanFramework framework = new ColumnsScanFramework( + scan.getColumns(), + scan.getWorkUnits(), + plugin.easyConfig().fsConf, + readerCreator); + + // If this format has no headers, or wants to skip them, + // then we must use the columns column to hold the data. + + framework.requireColumnsArray( + ! textPlugin.getConfig().isHeaderExtractionEnabled()); + + // Text files handle nulls in an unusual way. Missing columns + // are set to required Varchar and filled with blanks. Yes, this + // means that the SQL statement or code cannot differentiate missing + // columns from empty columns, but that is how CSV and other text + // files have been defined within Drill. + + framework.setNullType( + MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REQUIRED) + .build()); + + return framework; + } + } + + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { + this(name, context, fsConf, storageConfig, new TextFormatConfig()); + } + + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, + TextFormatConfig formatPluginConfig) { + super(name, easyConfig(fsConf, formatPluginConfig), context, config, formatPluginConfig); + } + + private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfig pluginConfig) { + EasyFormatConfig config = new EasyFormatConfig(); + config.readable = true; + config.writable = true; + config.blockSplittable = true; + config.compressible = true; + config.supportsProjectPushdown = true; + config.extensions = pluginConfig.getExtensions(); + config.fsConf = fsConf; + config.defaultName = PLUGIN_NAME; + config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE; + config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE; + return config; + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) + throws IOException { + return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns, OptionManager options) throws IOException { + return new EasyGroupScan(userName, selection, this, columns, + selection.selectionRoot, + // Some paths provide a null option manager. In that case, default to a + // min width of 1; just like the base class. + options == null ? 1 : (int) options.getLong(ExecConstants.MIN_READER_WIDTH_KEY)); + } + + @Override + protected ScanBatchCreator scanBatchCreator(OptionManager options) { + // Create the "legacy", "V2" reader or the new "V3" version based on + // the result set loader. This code should be temporary: the two + // readers provide identical functionality for the user; only the + // internals differ. + if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) { + return new TextScanBatchCreator(this, this); + } else { + return new ClassicScanBatchCreator(this); + } + } + + // TODO: Remove this once the V2 reader is removed. + @Override + public RecordReader getRecordReader(FragmentContext context, + DrillFileSystem dfs, + FileWork fileWork, + List<SchemaPath> columns, + String userName) { + Path path = dfs.makeQualified(fileWork.getPath()); + FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); + + if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { + TextParsingSettings settings = new TextParsingSettings(); + settings.set(formatConfig); + return new CompliantTextRecordReader(split, dfs, settings, columns); + } else { + char delim = formatConfig.getFieldDelimiter(); + return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns); + } + } + @Override + public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { + final Map<String, String> options = new HashMap<>(); + options.put("location", writer.getLocation()); + FragmentHandle handle = context.getHandle(); + String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); + options.put("prefix", fragmentId); + options.put("separator", getConfig().getFieldDelimiterAsString()); + options.put("extension", getConfig().getExtensions().get(0)); + RecordWriter recordWriter = new DrillTextRecordWriter( + context.getAllocator(), writer.getStorageStrategy(), writer.getFormatPlugin().getFsConf()); + recordWriter.init(options); + + return recordWriter; } @Override - public int getReaderOperatorType() { - return CoreOperatorType.TEXT_SUB_SCAN_VALUE; + public ManagedReader<ColumnsSchemaNegotiator> makeBatchReader( + DrillFileSystem dfs, + FileSplit split) throws ExecutionSetupException { + TextParsingSettingsV3 settings = new TextParsingSettingsV3(); + settings.set(getConfig()); + return new CompliantTextBatchReader(split, dfs, settings); + } + @Override + public boolean supportsStatistics() { + return false; } @Override - public int getWriterOperatorType() { - return CoreOperatorType.TEXT_WRITER_VALUE; + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); } @Override - public boolean supportsPushDown() { - return true; + public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); } + @Override + protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) { + long data = 0; + for (final CompleteFileWork work : scan.getWorkIterable()) { + data += work.getTotalBytes(); + } + final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE); + final double estRowCount = data / estimatedRowSize; + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java new file mode 100644 index 000000000..0ed3155a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Original version of the "compliant" text reader. This is version 2 of + * the text reader. This version is retained for temporary backward + * compatibility as we productize the newer version 3 based on the + * row set framework. + * <p> + * TODO: Remove the files in this package and move the files from the + * "v3" sub-package here once the version 3 implementation stabilizes. + */ +package org.apache.drill.exec.store.easy.text.compliant; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java new file mode 100644 index 000000000..6bf0bb69c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; + +public abstract class BaseFieldOutput extends TextOutput { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + private static final int MAX_FIELD_LENGTH = 1024 * 64; + + // track which field is getting appended + protected int currentFieldIndex = -1; + // track chars within field + protected int currentDataPointer; + // track if field is still getting appended + private boolean fieldOpen = true; + // holds chars for a field + protected byte[] fieldBytes; + protected final RowSetLoader writer; + private final boolean[] projectionMask; + protected final int maxField; + protected boolean fieldProjected; + + /** + * Initialize the field output for one of three scenarios: + * <ul> + * <li>SELECT all: SELECT *, SELECT columns. Indicated by a non -1 + * max fields.</li> + * <li>SELECT none: SELECT COUNT(*), etc. Indicated by a max field + * of -1.</li> + * <li>SELECT a, b, c indicated by a non-null projection mask that + * identifies the indexes of the fields to be selected. In this case, + * this constructor computes the maximum field.</li> + * </ul> + * + * @param writer Row set writer that provides access to the writer for + * each column + * @param maxField the index of the last field to store. May be -1 if no + * fields are to be stored. Computed if the projection mask is set + * @param projectionMask a boolean array indicating which fields are + * to be projected to the output. Optional + */ + + public BaseFieldOutput(RowSetLoader writer, int maxField, boolean[] projectionMask) { + this.writer = writer; + this.projectionMask = projectionMask; + + // If no projection mask is defined, then we want all columns + // up to the max field, which may be -1 if we want to select + // nothing. + + if (projectionMask == null) { + this.maxField = maxField; + } else { + + // Otherwise, use the projection mask to determine + // which fields are to be projected. (The file may well + // contain more than the projected set.) + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + this.maxField = end; + } + + // If we project at least one field, allocate a buffer. + + if (maxField >= 0) { + fieldBytes = new byte[MAX_FIELD_LENGTH]; + } + } + + /** + * Start a new record record. Resets all pointers + */ + + @Override + public void startRecord() { + currentFieldIndex = -1; + fieldOpen = false; + writer.start(); + } + + @Override + public void startField(int index) { + assert index == currentFieldIndex + 1; + currentFieldIndex = index; + currentDataPointer = 0; + fieldOpen = true; + + // Figure out if this field is projected. + + if (projectionMask == null) { + fieldProjected = currentFieldIndex <= maxField; + } else if (currentFieldIndex >= projectionMask.length) { + fieldProjected = false; + } else { + fieldProjected = projectionMask[currentFieldIndex]; + } + } + + @Override + public void append(byte data) { + if (! fieldProjected) { + return; + } + if (currentDataPointer >= MAX_FIELD_LENGTH - 1) { + throw UserException + .unsupportedError() + .message("Text column is too large.") + .addContext("Column", currentFieldIndex) + .addContext("Limit", MAX_FIELD_LENGTH) + .build(logger); + } + + fieldBytes[currentDataPointer++] = data; + } + + @Override + public boolean endField() { + fieldOpen = false; + return currentFieldIndex < maxField; + } + + @Override + public boolean endEmptyField() { + return endField(); + } + + @Override + public void finishRecord() { + if (fieldOpen) { + endField(); + } + writer.save(); + } + + @Override + public long getRecordCount() { + return writer.rowCount(); + } + + @Override + public boolean isFull() { + return writer.isFull(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java new file mode 100644 index 000000000..e489003c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/CompliantTextBatchReader.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; +import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.hadoop.mapred.FileSplit; + +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; + +/** + * New text reader, complies with the RFC 4180 standard for text/csv files + */ +public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextBatchReader.class); + + private static final int MAX_RECORDS_PER_BATCH = 8096; + private static final int READ_BUFFER = 1024 * 1024; + private static final int WHITE_SPACE_BUFFER = 64 * 1024; + + // settings to be used while parsing + private final TextParsingSettingsV3 settings; + // Chunk of the file to be read by this reader + private final FileSplit split; + // text reader implementation + private TextReader reader; + // input buffer + private DrillBuf readBuffer; + // working buffer to handle whitespaces + private DrillBuf whitespaceBuffer; + private final DrillFileSystem dfs; + + private RowSetLoader writer; + + public CompliantTextBatchReader(FileSplit split, DrillFileSystem dfs, TextParsingSettingsV3 settings) { + this.split = split; + this.settings = settings; + this.dfs = dfs; + + // Validate. Otherwise, these problems show up later as a data + // read error which is very confusing. + + if (settings.getNewLineDelimiter().length == 0) { + throw UserException + .validationError() + .message("The text format line delimiter cannot be blank.") + .build(logger); + } + } + + /** + * Performs the initial setup required for the record reader. + * Initializes the input stream, handling of the output record batch + * and the actual reader to be used. + * @param context operator context from which buffer's will be allocated and managed + * @param outputMutator Used to create the schema in the output record batch + */ + + @Override + public boolean open(ColumnsSchemaNegotiator schemaNegotiator) { + final OperatorContext context = schemaNegotiator.context(); + + // Note: DO NOT use managed buffers here. They remain in existence + // until the fragment is shut down. The buffers here are large. + // If we scan 1000 files, and allocate 1 MB for each, we end up + // holding onto 1 GB of memory in managed buffers. + // Instead, we allocate the buffers explicitly, and must free + // them. + + readBuffer = context.getAllocator().buffer(READ_BUFFER); + whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER); + + // TODO: Set this based on size of record rather than + // absolute count. + + schemaNegotiator.setBatchSize(MAX_RECORDS_PER_BATCH); + + // setup Output, Input, and Reader + try { + TextOutput output; + + if (settings.isHeaderExtractionEnabled()) { + output = openWithHeaders(schemaNegotiator); + } else { + output = openWithoutHeaders(schemaNegotiator); + } + if (output == null) { + return false; + } + openReader(output); + return true; + } catch (final IOException e) { + throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger); + } + } + + /** + * Extract header and use that to define the reader schema. + */ + + private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException { + final String [] fieldNames = extractHeader(); + if (fieldNames == null) { + return null; + } + final TupleMetadata schema = new TupleSchema(); + for (final String colName : fieldNames) { + schema.addColumn(MetadataUtils.newScalar(colName, MinorType.VARCHAR, DataMode.REQUIRED)); + } + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new FieldVarCharOutput(writer); + } + + /** + * When no headers, create a single array column "columns". + */ + + private TextOutput openWithoutHeaders( + ColumnsSchemaNegotiator schemaNegotiator) { + final TupleMetadata schema = new TupleSchema(); + schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR, DataMode.REPEATED)); + schemaNegotiator.setTableSchema(schema, true); + writer = schemaNegotiator.build().writer(); + return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes()); + } + + private void openReader(TextOutput output) throws IOException { + logger.trace("Opening file {}", split.getPath()); + final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput input = new TextInput(settings, stream, readBuffer, + split.getStart(), split.getStart() + split.getLength()); + + // setup Reader using Input and Output + reader = new TextReader(settings, input, output, whitespaceBuffer); + reader.start(); + } + + /** + * Extracts header from text file. + * Currently it is assumed to be first line if headerExtractionEnabled is set to true + * TODO: enhance to support more common header patterns + * @return field name strings + */ + + private String [] extractHeader() throws IOException { + assert settings.isHeaderExtractionEnabled(); + + // don't skip header in case skipFirstLine is set true + settings.setSkipFirstLine(false); + + final HeaderBuilder hOutput = new HeaderBuilder(split.getPath()); + + // setup Input using InputStream + // we should read file header irrespective of split given given to this reader + final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath()); + final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength()); + + // setup Reader using Input and Output + this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer); + reader.start(); + + // extract first row only + reader.parseNext(); + + // grab the field names from output + final String [] fieldNames = hOutput.getHeaders(); + + // cleanup and set to skip the first line next time we read input + reader.close(); + settings.setSkipFirstLine(true); + + readBuffer.clear(); + whitespaceBuffer.clear(); + return fieldNames; + } + + /** + * Generates the next record batch + * @return number of records in the batch + */ + + @Override + public boolean next() { + reader.resetForNextBatch(); + + try { + boolean more = false; + while (! writer.isFull()) { + more = reader.parseNext(); + if (! more) { + break; + } + } + reader.finishBatch(); + + // Return false on the batch that hits EOF. The scan operator + // knows to process any rows in this final batch. + + return more && writer.rowCount() > 0; + } catch (IOException | TextParsingException e) { + if (e.getCause() != null && e.getCause() instanceof UserException) { + throw (UserException) e.getCause(); + } + throw UserException.dataReadError(e) + .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.", + split.getPath(), reader.getPos()) + .build(logger); + } + } + + /** + * Cleanup state once we are finished processing all the records. + * This would internally close the input stream we are reading from. + */ + @Override + public void close() { + + // Release the buffers allocated above. Double-check to handle + // unexpected multiple calls to close(). + + if (readBuffer != null) { + readBuffer.release(); + readBuffer = null; + } + if (whitespaceBuffer != null) { + whitespaceBuffer.release(); + whitespaceBuffer = null; + } + try { + if (reader != null) { + reader.close(); + reader = null; + } + } catch (final IOException e) { + logger.warn("Exception while closing stream.", e); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java new file mode 100644 index 000000000..df48a5548 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a set of varchar vectors. A varchar vector contains all the field + * values for a given column. Each record is a single value within each vector of the set. + */ +class FieldVarCharOutput extends BaseFieldOutput { + + /** + * We initialize and add the varchar vector for each incoming field in this + * constructor. + * @param outputMutator Used to create/modify schema + * @param fieldNames Incoming field names + * @param columns List of columns selected in the query + * @param isStarQuery boolean to indicate if all fields are selected or not + */ + public FieldVarCharOutput(RowSetLoader writer) { + super(writer, + TextReader.MAXIMUM_NUMBER_COLUMNS, + makeMask(writer)); + } + + private static boolean[] makeMask(RowSetLoader writer) { + final TupleMetadata schema = writer.tupleSchema(); + final boolean projectionMask[] = new boolean[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + projectionMask[i] = schema.metadata(i).isProjected(); + } + return projectionMask; + } + + @Override + public boolean endField() { + if (fieldProjected) { + writer.scalar(currentFieldIndex) + .setBytes(fieldBytes, currentDataPointer); + } + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java new file mode 100644 index 000000000..62eafc898 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/HeaderBuilder.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.hadoop.fs.Path; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +/** + * Text output that implements a header reader/parser. + * The caller parses out the characters of each header; + * this class assembles UTF-8 bytes into Unicode characters, + * fixes invalid characters (those not legal for SQL symbols), + * and maps duplicate names to unique names. + * <p> + * That is, this class is as permissive as possible with file + * headers to avoid spurious query failures for trivial reasons. + */ + +// Note: this class uses Java heap strings and the usual Java +// convenience classes. Since we do heavy Unicode string operations, +// and read a single row, there is no good reason to try to use +// value vectors and direct memory for this task. + +public class HeaderBuilder extends TextOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HeaderBuilder.class); + + /** + * Maximum Drill symbol length, as enforced for headers. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + */ + // TODO: Replace with the proper constant, if available + public static final int MAX_HEADER_LEN = 1024; + + /** + * Prefix used to replace non-alphabetic characters at the start of + * a column name. For example, $foo becomes col_foo. Used + * because SQL does not allow _foo. + */ + + public static final String COLUMN_PREFIX = "col_"; + + /** + * Prefix used to create numbered columns for missing + * headers. Typical names: column_1, column_2, ... + */ + + public static final String ANONYMOUS_COLUMN_PREFIX = "column_"; + + public final Path filePath; + public final List<String> headers = new ArrayList<>(); + public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN); + + public HeaderBuilder(Path filePath) { + this.filePath = filePath; + } + + @Override + public void startField(int index) { + currentField.clear(); + } + + @Override + public boolean endField() { + String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8); + header = validateSymbol(header); + headers.add(header); + return true; + } + + @Override + public boolean endEmptyField() { + + // Empty header will be rewritten to "column_<n>". + + return endField(); + } + + /** + * Validate the header name according to the SQL lexical rules. + * @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier"> + * identifier documentation</a> + * @param header the header name to validate + */ + + // TODO: Replace with existing code, if any. + private String validateSymbol(String header) { + header = header.trim(); + + // To avoid unnecessary query failures, just make up a column name + // if the name is missing or all blanks. + + if (header.isEmpty()) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } + if (! Character.isAlphabetic(header.charAt(0))) { + return rewriteHeader(header); + } + for (int i = 1; i < header.length(); i++) { + char ch = header.charAt(i); + if (! Character.isAlphabetic(ch) && + ! Character.isDigit(ch) && ch != '_') { + return rewriteHeader(header); + } + } + return header; + } + + /** + * Given an invalid header, rewrite it to replace illegal characters + * with valid ones. The header won't be what the user specified, + * but it will be a valid SQL identifier. This solution avoids failing + * queries due to corrupted or invalid header data. + * <p> + * Names with invalid first characters are mapped to "col_". Example: + * $foo maps to col_foo. If the only character is non-alphabetic, treat + * the column as anonymous and create a generic name: column_4, etc. + * <p> + * This mapping could create a column that exceeds the maximum length + * of 1024. Since that is not really a hard limit, we just live with the + * extra few characters. + * + * @param header the original header + * @return the rewritten header, valid for SQL + */ + + private String rewriteHeader(String header) { + final StringBuilder buf = new StringBuilder(); + + // If starts with non-alphabetic, can't map the character to + // underscore, so just tack on a prefix. + + char ch = header.charAt(0); + if (Character.isAlphabetic(ch)) { + buf.append(ch); + } else if (Character.isDigit(ch)) { + buf.append(COLUMN_PREFIX); + buf.append(ch); + + // For the strange case of only one character, format + // the same as an empty header. + + } else if (header.length() == 1) { + return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1); + } else { + buf.append(COLUMN_PREFIX); + } + + // Convert all remaining invalid characters to underscores + + for (int i = 1; i < header.length(); i++) { + ch = header.charAt(i); + if (Character.isAlphabetic(ch) || + Character.isDigit(ch) || ch == '_') { + buf.append(ch); + } else { + buf.append("_"); + } + } + return buf.toString(); + } + + @Override + public void append(byte data) { + + // Ensure the data fits. Note that, if the name is Unicode, the actual + // number of characters might be less than the limit even though the + // byte count exceeds the limit. Fixing this, in general, would require + // a buffer four times larger, so we leave that as a later improvement + // if ever needed. + + try { + currentField.put(data); + } catch (BufferOverflowException e) { + throw UserException.dataReadError() + .message("Column exceeds maximum length of %d", MAX_HEADER_LEN) + .addContext("File Path", filePath.toString()) + .build(logger); + } + } + + @Override + public void finishRecord() { + if (headers.isEmpty()) { + throw UserException.dataReadError() + .message("The file must define at least one header.") + .addContext("File Path", filePath.toString()) + .build(logger); + } + + // Force headers to be unique. + + final Set<String> idents = new HashSet<String>(); + for (int i = 0; i < headers.size(); i++) { + String header = headers.get(i); + String key = header.toLowerCase(); + + // Is the header a duplicate? + + if (idents.contains(key)) { + + // Make header unique by appending a suffix. + // This loop must end because we have a finite + // number of headers. + // The original column is assumed to be "1", so + // the first duplicate is "2", and so on. + // Note that this will map columns of the form: + // "col,col,col_2,col_2_2" to + // "col", "col_2", "col_2_2", "col_2_2_2". + // No mapping scheme is perfect... + + for (int l = 2; ; l++) { + final String rewritten = header + "_" + l; + key = rewritten.toLowerCase(); + if (! idents.contains(key)) { + headers.set(i, rewritten); + break; + } + } + } + idents.add(key); + } + } + + @Override + public void startRecord() { } + + public String[] getHeaders() { + + // Just return the headers: any needed checks were done in + // finishRecord() + + final String array[] = new String[headers.size()]; + return headers.toArray(array); + } + + // Not used. + @Override + public long getRecordCount() { return 0; } + + @Override + public boolean isFull() { return false; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java new file mode 100644 index 000000000..13b44509e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; + +/** + * Class is responsible for generating record batches for text file inputs. We generate + * a record batch with a single vector of type repeated varchar vector. Each record is a single + * value within the vector containing all the fields in the record as individual array elements. + */ +public class RepeatedVarCharOutput extends BaseFieldOutput { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class); + + private final ScalarWriter columnWriter; + private final ArrayWriter arrayWriter; + + /** + * Provide the row set loader (which must have just one repeated Varchar + * column) and an optional array projection mask. + * @param projectionMask + * @param tupleLoader + */ + + public RepeatedVarCharOutput(RowSetLoader loader, boolean[] projectionMask) { + super(loader, + maxField(loader, projectionMask), + projectionMask); + arrayWriter = writer.array(0); + columnWriter = arrayWriter.scalar(); + } + + private static int maxField(RowSetLoader loader, boolean[] projectionMask) { + + // If the one and only field (`columns`) is not selected, then this + // is a COUNT(*) or similar query. Select nothing. + + if (! loader.tupleSchema().metadata(0).isProjected()) { + return -1; + } + + // If this is SELECT * or SELECT `columns` query, project all + // possible fields. + + if (projectionMask == null) { + return TextReader.MAXIMUM_NUMBER_COLUMNS; + } + + // Else, this is a SELECT columns[x], columns[y], ... query. + // Project only the requested element members (fields). + + int end = projectionMask.length - 1; + while (end >= 0 && ! projectionMask[end]) { + end--; + } + return end; + } + + /** + * Write the value into an array position. Rules: + * <ul> + * <li>If there is no projection mask, collect all columns.</li> + * <li>If a selection mask is present, we previously found the index + * of the last projection column (<tt>maxField</tt>). If the current + * column is beyond that number, ignore the data and stop accepting + * columns.</li> + * <li>If the column is projected, add the data to the array.</li> + * <li>If the column is not projected, add a blank value to the + * array.</li> + * </ul> + * The above ensures that we leave no holes in the portion of the + * array that is projected (by adding blank columns where needed), + * and we just ignore columns past the end of the projected part + * of the array. (No need to fill holes at the end.) + */ + + @Override + public boolean endField() { + + // Skip the field if past the set of projected fields. + + if (currentFieldIndex > maxField) { + return false; + } + + // If the field is projected, save it. + + if (fieldProjected) { + + // Repeated var char will create as many entries as there are columns. + // If this would exceed the maximum, issue an error. Note that we do + // this only if all fields are selected; the same query will succeed if + // the user does a COUNT(*) or SELECT columns[x], columns[y], ... + + if (currentFieldIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) { + throw UserException + .unsupportedError() + .message("Text file contains too many fields") + .addContext("Limit", TextReader.MAXIMUM_NUMBER_COLUMNS) + .build(logger); + } + + // Save the field. + + columnWriter.setBytes(fieldBytes, currentDataPointer); + } else { + + // The field is not projected. + // Must write a value into this array position, but + // the value should be empty. + + columnWriter.setBytes(fieldBytes, 0); + } + + // Return whether the rest of the fields should be read. + + return super.endField(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java new file mode 100644 index 000000000..70c43b7ee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/StreamFinishedPseudoException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +class StreamFinishedPseudoException extends RuntimeException { + + public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException(); + + private StreamFinishedPseudoException() { + super("", null, false, true); + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java new file mode 100644 index 000000000..26fade6d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; +import io.netty.util.internal.PlatformDependent; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.compress.CompressionInputStream; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck; + +/** + * Class that fronts an InputStream to provide a byte consumption interface. + * Also manages only reading lines to and from each split. + */ +final class TextInput { + + private final byte[] lineSeparator; + private final byte normalizedLineSeparator; + private final TextParsingSettingsV3 settings; + + private long lineCount; + private long charCount; + + /** + * The starting position in the file. + */ + private final long startPos; + private final long endPos; + + private long streamPos; + + private final Seekable seekable; + private final FSDataInputStream inputFS; + private final InputStream input; + + private final DrillBuf buffer; + private final ByteBuffer underlyingBuffer; + private final long bStart; + private final long bStartMinus1; + + private final boolean bufferReadable; + + /** + * Whether there was a possible partial line separator on the previous + * read so we dropped it and it should be appended to next read. + */ + private int remByte = -1; + + /** + * The current position in the buffer. + */ + public int bufferPtr; + + /** + * The quantity of valid data in the buffer. + */ + public int length = -1; + + private boolean endFound = false; + + /** + * Creates a new instance with the mandatory characters for handling newlines + * transparently. lineSeparator the sequence of characters that represent a + * newline, as defined in {@link Format#getLineSeparator()} + * normalizedLineSeparator the normalized newline character (as defined in + * {@link Format#getNormalizedNewline()}) that is used to replace any + * lineSeparator sequence found in the input. + */ + public TextInput(TextParsingSettingsV3 settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) { + this.lineSeparator = settings.getNewLineDelimiter(); + byte normalizedLineSeparator = settings.getNormalizedNewLine(); + Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable."); + boolean isCompressed = input instanceof CompressionInputStream; + Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream."); + + // splits aren't allowed with compressed data. The split length will be the compressed size which means we'll normally end prematurely. + if (isCompressed && endPos > 0) { + endPos = Long.MAX_VALUE; + } + + this.input = input; + this.seekable = (Seekable) input; + this.settings = settings; + + if (input instanceof FSDataInputStream) { + this.inputFS = (FSDataInputStream) input; + this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable; + } else { + this.inputFS = null; + this.bufferReadable = false; + } + + this.startPos = startPos; + this.endPos = endPos; + + this.normalizedLineSeparator = normalizedLineSeparator; + + this.buffer = readBuffer; + this.bStart = buffer.memoryAddress(); + this.bStartMinus1 = bStart -1; + this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity()); + } + + /** + * Test the input to position for read start. If the input is a non-zero split or + * splitFirstLine is enabled, input will move to appropriate complete line. + * @throws IOException for input file read errors + */ + final void start() throws IOException { + lineCount = 0; + if(startPos > 0){ + seekable.seek(startPos); + } + + updateBuffer(); + if (length > 0) { + if (startPos > 0 || settings.isSkipFirstLine()) { + + // move to next full record. + skipLines(1); + } + } + } + + + /** + * Helper method to get the most recent characters consumed since the last record started. + * May get an incomplete string since we don't support stream rewind. Returns empty string for now. + * + * @return String of last few bytes. + * @throws IOException for input file read errors + */ + public String getStringSinceMarkForError() throws IOException { + return " "; + } + + long getPos() { + return streamPos + bufferPtr; + } + + public void mark() { } + + /** + * Read some more bytes from the stream. Uses the zero copy interface if available. + * Otherwise, does byte copy. + * + * @throws IOException for input file read errors + */ + private void read() throws IOException { + if (bufferReadable) { + + if (remByte != -1) { + for (int i = 0; i <= remByte; i++) { + underlyingBuffer.put(lineSeparator[i]); + } + remByte = -1; + } + length = inputFS.read(underlyingBuffer); + + } else { + byte[] b = new byte[underlyingBuffer.capacity()]; + if (remByte != -1){ + int remBytesNum = remByte + 1; + System.arraycopy(lineSeparator, 0, b, 0, remBytesNum); + length = input.read(b, remBytesNum, b.length - remBytesNum); + remByte = -1; + } else { + length = input.read(b); + } + underlyingBuffer.put(b); + } + } + + + /** + * Read more data into the buffer. Will also manage split end conditions. + * + * @throws IOException for input file read errors + */ + private void updateBuffer() throws IOException { + streamPos = seekable.getPos(); + underlyingBuffer.clear(); + + if (endFound) { + length = -1; + return; + } + + read(); + + // check our data read allowance. + if (streamPos + length >= this.endPos) { + updateLengthBasedOnConstraint(); + } + + charCount += bufferPtr; + bufferPtr = 1; + + buffer.writerIndex(underlyingBuffer.limit()); + buffer.readerIndex(underlyingBuffer.position()); + } + + /** + * Checks to see if we can go over the end of our bytes constraint on the data. If so, + * adjusts so that we can only read to the last character of the first line that crosses + * the split boundary. + */ + private void updateLengthBasedOnConstraint() { + final long max = bStart + length; + for (long m = bStart + (endPos - streamPos); m < max; m++) { + for (int i = 0; i < lineSeparator.length; i++) { + long mPlus = m + i; + if (mPlus < max) { + // we found a line separator and don't need to consult the next byte. + if (lineSeparator[i] == PlatformDependent.getByte(mPlus) && i == lineSeparator.length - 1) { + length = (int) (mPlus - bStart) + 1; + endFound = true; + return; + } + } else { + // the last N characters of the read were remnant bytes. We'll hold off on dealing with these bytes until the next read. + remByte = i; + length = length - i; + return; + } + } + } + } + + /** + * Get next byte from stream. Also maintains the current line count. Will throw a + * {@link StreamFinishedPseudoException} when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextChar() throws IOException { + byte byteChar = nextCharNoNewLineCheck(); + int bufferPtrTemp = bufferPtr - 1; + if (byteChar == lineSeparator[0]) { + for (int i = 1; i < lineSeparator.length; i++, bufferPtrTemp++) { + if (lineSeparator[i] != buffer.getByte(bufferPtrTemp)) { + return byteChar; + } + } + + lineCount++; + byteChar = normalizedLineSeparator; + + // we don't need to update buffer position if line separator is one byte long + if (lineSeparator.length > 1) { + bufferPtr += (lineSeparator.length - 1); + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + } + } + + return byteChar; + } + + /** + * Get next byte from stream. Do no maintain any line count Will throw a StreamFinishedPseudoException + * when the stream has run out of bytes. + * + * @return next byte from stream. + * @throws IOException for input file read errors + */ + public final byte nextCharNoNewLineCheck() throws IOException { + + if (length == -1) { + throw StreamFinishedPseudoException.INSTANCE; + } + + rangeCheck(buffer, bufferPtr - 1, bufferPtr); + + byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr); + + if (bufferPtr >= length) { + if (length != -1) { + updateBuffer(); + bufferPtr--; + } else { + throw StreamFinishedPseudoException.INSTANCE; + } + } + + bufferPtr++; + return byteChar; + } + + /** + * Number of lines read since the start of this split. + * @return current line count + */ + public final long lineCount() { + return lineCount; + } + + /** + * Skip forward the number of line delimiters. If you are in the middle of a line, + * a value of 1 will skip to the start of the next record. + * + * @param lines Number of lines to skip. + * @throws IOException for input file read errors + * @throws IllegalArgumentException if unable to skip the requested number + * of lines + */ + public final void skipLines(int lines) throws IOException { + if (lines < 1) { + return; + } + long expectedLineCount = this.lineCount + lines; + + try { + do { + nextChar(); + } while (lineCount < expectedLineCount); + if (lineCount < lines) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } catch (EOFException ex) { + throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached"); + } + } + + public final long charCount() { + return charCount + bufferPtr; + } + + public long getLineCount() { + return lineCount; + } + + public void close() throws IOException{ + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java new file mode 100644 index 000000000..48c184991 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextOutput.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + + +/** + * Base class for producing output record batches while dealing with + * text files. Defines the interface called from text parsers to create + * the corresponding value vectors (record batch). + */ + +abstract class TextOutput { + + public abstract void startRecord(); + + /** + * Start processing a new field within a record. + * @param index index within the record + */ + public abstract void startField(int index); + + /** + * End processing a field within a record. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endField(); + + /** + * Shortcut that lets the output know that we are closing ending a field with no data. + * @return true if engine should continue processing record. false if rest of record can be skipped. + */ + public abstract boolean endEmptyField(); + + /** + * Add the provided data but drop any whitespace. + * @param data character to append + */ + public void appendIgnoringWhitespace(byte data) { + if (TextReader.isWhite(data)) { + // noop + } else { + append(data); + } + } + + /** + * Appends a byte to the output character data buffer + * @param data current byte read + */ + public abstract void append(byte data); + + /** + * Completes the processing of a given record. Also completes the processing of the + * last field being read. + */ + public abstract void finishRecord(); + + /** + * Return the total number of records (across batches) processed + */ + public abstract long getRecordCount(); + + /** + * Indicates if the current batch is full and reading for this batch + * should stop. + * + * @return true if the batch is full and the reader must exit to allow + * the batch to be sent downstream, false if the reader may continue to + * add rows to the current batch + */ + + public abstract boolean isFull(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java new file mode 100644 index 000000000..86cad4c88 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingContext.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import java.io.IOException; + +import com.univocity.parsers.common.ParsingContext; + +class TextParsingContext implements ParsingContext { + + private final TextInput input; + private final TextOutput output; + protected boolean stopped; + + private int[] extractedIndexes; + + public TextParsingContext(TextInput input, TextOutput output) { + this.input = input; + this.output = output; + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() { + stopped = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isStopped() { + return stopped; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentLine() { + return input.lineCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public long currentChar() { + return input.charCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public int currentColumn() { + return -1; + } + + /** + * {@inheritDoc} + */ + @Override + public String[] headers() { + return new String[]{}; + } + + /** + * {@inheritDoc} + */ + @Override + public int[] extractedFieldIndexes() { + return extractedIndexes; + } + + /** + * {@inheritDoc} + */ + @Override + public long currentRecord() { + return output.getRecordCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public String currentParsedContent() { + try { + return input.getStringSinceMarkForError(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void skipLines(int lines) { + } + + @Override + public boolean columnsReordered() { + return false; + } + + public boolean isFull() { + return output.isFull(); + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java new file mode 100644 index 000000000..0341b4554 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextParsingSettingsV3.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig; + +import org.apache.drill.shaded.guava.com.google.common.base.Charsets; + +// TODO: Remove the "V3" suffix once the V2 version is retired. +public class TextParsingSettingsV3 { + + public static final TextParsingSettingsV3 DEFAULT = new TextParsingSettingsV3(); + + private String emptyValue = null; + private boolean parseUnescapedQuotes = true; + private byte quote = b('"'); + private byte quoteEscape = b('"'); + private byte delimiter = b(','); + private byte comment = b('#'); + + private long maxCharsPerColumn = Character.MAX_VALUE; + private byte normalizedNewLine = b('\n'); + private byte[] newLineDelimiter = {normalizedNewLine}; + private boolean ignoreLeadingWhitespaces; + private boolean ignoreTrailingWhitespaces; + private String lineSeparatorString = "\n"; + private boolean skipFirstLine; + + private boolean headerExtractionEnabled; + private boolean useRepeatedVarChar = true; + + public void set(TextFormatConfig config){ + this.quote = bSafe(config.getQuote(), "quote"); + this.quoteEscape = bSafe(config.getEscape(), "escape"); + this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8); + this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter"); + this.comment = bSafe(config.getComment(), "comment"); + this.skipFirstLine = config.isSkipFirstLine(); + this.headerExtractionEnabled = config.isHeaderExtractionEnabled(); + if (this.headerExtractionEnabled) { + // In case of header TextRecordReader will use set of VarChar vectors vs RepeatedVarChar + this.useRepeatedVarChar = false; + } + } + + public byte getComment() { + return comment; + } + + public boolean isSkipFirstLine() { + return skipFirstLine; + } + + public void setSkipFirstLine(boolean skipFirstLine) { + this.skipFirstLine = skipFirstLine; + } + + public boolean isUseRepeatedVarChar() { + return useRepeatedVarChar; + } + + public void setUseRepeatedVarChar(boolean useRepeatedVarChar) { + this.useRepeatedVarChar = useRepeatedVarChar; + } + + private static byte bSafe(char c, String name) { + if (c > Byte.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Failure validating configuration option %s. Expected a " + + "character between 0 and 127 but value was actually %d.", name, (int) c)); + } + return (byte) c; + } + + private static byte b(char c) { + return (byte) c; + } + + public byte[] getNewLineDelimiter() { + return newLineDelimiter; + } + + /** + * Returns the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @return the quote character + */ + public byte getQuote() { + return quote; + } + + /** + * Defines the character used for escaping values where the field delimiter is + * part of the value. Defaults to '"' + * + * @param quote + * the quote character + */ + public void setQuote(byte quote) { + this.quote = quote; + } + + public String getLineSeparatorString() { + return lineSeparatorString; + } + + /** + * Identifies whether or not a given character is used for escaping values + * where the field delimiter is part of the value + * + * @param ch + * the character to be verified + * @return true if the given character is the character used for escaping + * values, false otherwise + */ + public boolean isQuote(byte ch) { + return this.quote == ch; + } + + /** + * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"' + * @return the quote escape character + */ + public byte getQuoteEscape() { + return quoteEscape; + } + + /** + * Defines the character used for escaping quotes inside an already quoted + * value. Defaults to '"' + * + * @param quoteEscape + * the quote escape character + */ + public void setQuoteEscape(byte quoteEscape) { + this.quoteEscape = quoteEscape; + } + + /** + * Identifies whether or not a given character is used for escaping quotes + * inside an already quoted value. + * + * @param ch + * the character to be verified + * @return true if the given character is the quote escape character, false + * otherwise + */ + public boolean isQuoteEscape(byte ch) { + return this.quoteEscape == ch; + } + + /** + * Returns the field delimiter character. Defaults to ',' + * @return the field delimiter character + */ + public byte getDelimiter() { + return delimiter; + } + + /** + * Defines the field delimiter character. Defaults to ',' + * @param delimiter the field delimiter character + */ + public void setDelimiter(byte delimiter) { + this.delimiter = delimiter; + } + + /** + * Identifies whether or not a given character represents a field delimiter + * @param ch the character to be verified + * @return true if the given character is the field delimiter character, false otherwise + */ + public boolean isDelimiter(byte ch) { + return this.delimiter == ch; + } + + /** + * Returns the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @return the String representation of an empty value + */ + public String getEmptyValue() { + return emptyValue; + } + + /** + * Sets the String representation of an empty value (defaults to null) + * + * <p> + * When reading, if the parser does not read any character from the input, and + * the input is within quotes, the empty is used instead of an empty string + * + * @param emptyValue + * the String representation of an empty value + */ + public void setEmptyValue(String emptyValue) { + this.emptyValue = emptyValue; + } + + /** + * Indicates whether the CSV parser should accept unescaped quotes inside + * quoted values and parse them normally. Defaults to {@code true}. + * + * @return a flag indicating whether or not the CSV parser should accept + * unescaped quotes inside quoted values. + */ + public boolean isParseUnescapedQuotes() { + return parseUnescapedQuotes; + } + + /** + * Configures how to handle unescaped quotes inside quoted values. If set to + * {@code true}, the parser will parse the quote normally as part of the + * value. If set the {@code false}, a + * {@link com.univocity.parsers.common.TextParsingException} will be thrown. + * Defaults to {@code true}. + * + * @param parseUnescapedQuotes + * indicates whether or not the CSV parser should accept unescaped + * quotes inside quoted values. + */ + public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) { + this.parseUnescapedQuotes = parseUnescapedQuotes; + } + + /** + * Indicates whether or not the first valid record parsed from the input + * should be considered as the row containing the names of each column + * + * @return true if the first valid record parsed from the input should be + * considered as the row containing the names of each column, false + * otherwise + */ + public boolean isHeaderExtractionEnabled() { + return headerExtractionEnabled; + } + + /** + * Defines whether or not the first valid record parsed from the input should + * be considered as the row containing the names of each column + * + * @param headerExtractionEnabled + * a flag indicating whether the first valid record parsed from the + * input should be considered as the row containing the names of each + * column + */ + public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) { + this.headerExtractionEnabled = headerExtractionEnabled; + } + + public long getMaxCharsPerColumn() { + return maxCharsPerColumn; + } + + public void setMaxCharsPerColumn(long maxCharsPerColumn) { + this.maxCharsPerColumn = maxCharsPerColumn; + } + + public void setComment(byte comment) { + this.comment = comment; + } + + public byte getNormalizedNewLine() { + return normalizedNewLine; + } + + public void setNormalizedNewLine(byte normalizedNewLine) { + this.normalizedNewLine = normalizedNewLine; + } + + public boolean isIgnoreLeadingWhitespaces() { + return ignoreLeadingWhitespaces; + } + + public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) { + this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces; + } + + public boolean isIgnoreTrailingWhitespaces() { + return ignoreTrailingWhitespaces; + } + + public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) { + this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java new file mode 100644 index 000000000..17a076c0c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextReader.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; + +import io.netty.buffer.DrillBuf; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.UserException; + +import com.univocity.parsers.common.TextParsingException; + +/******************************************************************************* + * Portions Copyright 2014 uniVocity Software Pty Ltd + ******************************************************************************/ + +/** + * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and + * DrillBuf support. + */ +public final class TextReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class); + + private static final byte NULL_BYTE = (byte) '\0'; + + public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024; + + private final TextParsingContext context; + + private final TextParsingSettingsV3 settings; + + private final TextInput input; + private final TextOutput output; + private final DrillBuf workBuf; + + private byte ch; + + // index of the field within this record + private int fieldIndex; + + /** Behavior settings **/ + private final boolean ignoreTrailingWhitespace; + private final boolean ignoreLeadingWhitespace; + private final boolean parseUnescapedQuotes; + + /** Key Characters **/ + private final byte comment; + private final byte delimiter; + private final byte quote; + private final byte quoteEscape; + private final byte newLine; + + /** + * The CsvParser supports all settings provided by {@link TextParsingSettingsV3}, + * and requires this configuration to be properly initialized. + * @param settings the parser configuration + * @param input input stream + * @param output interface to produce output record batch + * @param workBuf working buffer to handle whitespace + */ + public TextReader(TextParsingSettingsV3 settings, TextInput input, TextOutput output, DrillBuf workBuf) { + this.context = new TextParsingContext(input, output); + this.workBuf = workBuf; + this.settings = settings; + + this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces(); + this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces(); + this.parseUnescapedQuotes = settings.isParseUnescapedQuotes(); + this.delimiter = settings.getDelimiter(); + this.quote = settings.getQuote(); + this.quoteEscape = settings.getQuoteEscape(); + this.newLine = settings.getNormalizedNewLine(); + this.comment = settings.getComment(); + + this.input = input; + this.output = output; + } + + public TextOutput getOutput() { return output; } + + /** + * Check if the given byte is a white space. As per the univocity text reader + * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed + * we have an additional check to make sure its not negative + */ + static final boolean isWhite(byte b){ + return b <= ' ' && b > -1; + } + + /** + * Inform the output interface to indicate we are starting a new record batch + */ + public void resetForNextBatch() { } + + public long getPos() { return input.getPos(); } + + /** + * Function encapsulates parsing an entire record, delegates parsing of the + * fields to parseField() function. + * We mark the start of the record and if there are any failures encountered (OOM for eg) + * then we reset the input stream to the marked position + * @return true if parsing this record was successful; false otherwise + * @throws IOException for input file read errors + */ + private boolean parseRecord() throws IOException { + final byte newLine = this.newLine; + final TextInput input = this.input; + + input.mark(); + + fieldIndex = 0; + if (ignoreLeadingWhitespace && isWhite(ch)) { + skipWhitespace(); + } + + output.startRecord(); + int fieldsWritten = 0; + try { + @SuppressWarnings("unused") + boolean earlyTerm = false; + while (ch != newLine) { + earlyTerm = ! parseField(); + fieldsWritten++; + if (ch != newLine) { + ch = input.nextChar(); + if (ch == newLine) { + output.startField(fieldsWritten++); + output.endEmptyField(); + break; + } + } + + // Disabling early termination. See DRILL-5914 + +// if (earlyTerm) { +// if (ch != newLine) { +// input.skipLines(1); +// } +// break; +// } + } + output.finishRecord(); + } catch (StreamFinishedPseudoException e) { + + // if we've written part of a field or all of a field, we should send this row. + + if (fieldsWritten == 0) { + throw e; + } else { + output.finishRecord(); + } + } + return true; + } + + /** + * Function parses an individual field and ignores any white spaces encountered + * by not appending it to the output vector + * @throws IOException for input file read errors + */ + private void parseValueIgnore() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + appendIgnoringWhitespace(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + public void appendIgnoringWhitespace(byte data) { + if (! isWhite(data)) { + output.append(data); + } + } + + /** + * Function parses an individual field and appends all characters till the delimeter (or newline) + * to the output, including white spaces + * @throws IOException for input file read errors + */ + private void parseValueAll() throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + + byte ch = this.ch; + while (ch != delimiter && ch != newLine) { + output.append(ch); + ch = input.nextChar(); + } + this.ch = ch; + } + + /** + * Function simply delegates the parsing of a single field to the actual + * implementation based on parsing config + * + * @throws IOException + * for input file read errors + */ + private void parseValue() throws IOException { + if (ignoreTrailingWhitespace) { + parseValueIgnore(); + } else { + parseValueAll(); + } + } + + /** + * Recursive function invoked when a quote is encountered. Function also + * handles the case when there are non-white space characters in the field + * after the quoted value. + * @param prev previous byte read + * @throws IOException for input file read errors + */ + private void parseQuotedValue(byte prev) throws IOException { + final byte newLine = this.newLine; + final byte delimiter = this.delimiter; + final TextOutput output = this.output; + final TextInput input = this.input; + final byte quote = this.quote; + + ch = input.nextCharNoNewLineCheck(); + + while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) { + if (ch != quote) { + if (prev == quote) { // unescaped quote detected + if (parseUnescapedQuotes) { + output.append(quote); + output.append(ch); + parseQuotedValue(ch); + break; + } else { + throw new TextParsingException( + context, + "Unescaped quote character '" + + quote + + "' inside quoted value of CSV field. To allow unescaped quotes, " + + "set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. " + + "Cannot parse CSV input."); + } + } + output.append(ch); + prev = ch; + } else if (prev == quoteEscape) { + output.append(quote); + prev = NULL_BYTE; + } else { + prev = ch; + } + ch = input.nextCharNoNewLineCheck(); + } + + // Handles whitespace after quoted value: + // Whitespace are ignored (i.e., ch <= ' ') if they are not used as delimiters (i.e., ch != ' ') + // For example, in tab-separated files (TSV files), '\t' is used as delimiter and should not be ignored + // Content after whitespace may be parsed if 'parseUnescapedQuotes' is enabled. + if (ch != newLine && ch <= ' ' && ch != delimiter) { + final DrillBuf workBuf = this.workBuf; + workBuf.resetWriterIndex(); + do { + // saves whitespace after value + workBuf.writeByte(ch); + ch = input.nextChar(); + // found a new line, go to next record. + if (ch == newLine) { + return; + } + } while (ch <= ' ' && ch != delimiter); + + // there's more stuff after the quoted value, not only empty spaces. + if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) { + + output.append(quote); + for(int i =0; i < workBuf.writerIndex(); i++){ + output.append(workBuf.getByte(i)); + } + // the next character is not the escape character, put it there + if (ch != quoteEscape) { + output.append(ch); + } + // sets this character as the previous character (may be escaping) + // calls recursively to keep parsing potentially quoted content + parseQuotedValue(ch); + } + } + + if (!(ch == delimiter || ch == newLine)) { + throw new TextParsingException(context, "Unexpected character '" + ch + + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input."); + } + } + + /** + * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function + * @return true if more rows can be read, false if not + * @throws IOException for input file read errors + */ + private final boolean parseField() throws IOException { + + output.startField(fieldIndex++); + + if (isWhite(ch) && ignoreLeadingWhitespace) { + skipWhitespace(); + } + + // Delimiter? Then this is an empty field. + + if (ch == delimiter) { + return output.endEmptyField(); + } + + // Have the first character of the field. Parse and save the + // field, even if we hit EOF. (An EOF identifies a last line + // that contains data, but is not terminated with a newline.) + + try { + if (ch == quote) { + parseQuotedValue(NULL_BYTE); + } else { + parseValue(); + } + return output.endField(); + } catch (StreamFinishedPseudoException e) { + return output.endField(); + } + } + + /** + * Helper function to skip white spaces occurring at the current input stream. + * @throws IOException for input file read errors + */ + private void skipWhitespace() throws IOException { + final byte delimiter = this.delimiter; + final byte newLine = this.newLine; + final TextInput input = this.input; + + while (isWhite(ch) && ch != delimiter && ch != newLine) { + ch = input.nextChar(); + } + } + + /** + * Starting point for the reader. Sets up the input interface. + * @throws IOException for input file read errors + */ + public final void start() throws IOException { + context.stopped = false; + input.start(); + } + + /** + * Parses the next record from the input. Will skip the line if its a comment, + * this is required when the file contains headers + * @throws IOException for input file read errors + */ + public final boolean parseNext() throws IOException { + try { + while (! context.stopped) { + ch = input.nextChar(); + if (ch == comment) { + input.skipLines(1); + continue; + } + break; + } + final long initialLineNumber = input.lineCount(); + boolean success = parseRecord(); + if (initialLineNumber + 1 < input.lineCount()) { + throw new TextParsingException(context, "Cannot use newline character within quoted string"); + } + + return success; + } catch (UserException ex) { + stopParsing(); + throw ex; + } catch (StreamFinishedPseudoException ex) { + stopParsing(); + return false; + } catch (Exception ex) { + try { + throw handleException(ex); + } finally { + stopParsing(); + } + } + } + + private void stopParsing() { } + + private String displayLineSeparators(String str, boolean addNewLine) { + if (addNewLine) { + if (str.contains("\r\n")) { + str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t"); + } else if (str.contains("\n")) { + str = str.replaceAll("\\n", "[\\\\n]\n\t"); + } else { + str = str.replaceAll("\\r", "[\\\\r]\r\t"); + } + } else { + str = str.replaceAll("\\n", "\\\\n"); + str = str.replaceAll("\\r", "\\\\r"); + } + return str; + } + + /** + * Helper method to handle exceptions caught while processing text files and generate better error messages associated with + * the exception. + * @param ex Exception raised + * @throws IOException for input file read errors + */ + private TextParsingException handleException(Exception ex) throws IOException { + + if (ex instanceof TextParsingException) { + throw (TextParsingException) ex; + } + + String message = null; + String tmp = input.getStringSinceMarkForError(); + char[] chars = tmp.toCharArray(); + if (chars != null) { + int length = chars.length; + if (length > settings.getMaxCharsPerColumn()) { + message = "Length of parsed input (" + length + + ") exceeds the maximum number of characters defined in your parser settings (" + + settings.getMaxCharsPerColumn() + "). "; + } + + if (tmp.contains("\n") || tmp.contains("\r")) { + tmp = displayLineSeparators(tmp, true); + String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false); + message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '" + + lineSeparator + "'. Parsed content:\n\t" + tmp; + } + + int nullCharacterCount = 0; + // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError + int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length; + StringBuilder s = new StringBuilder(maxLength); + for (int i = 0; i < maxLength; i++) { + if (chars[i] == '\0') { + s.append('\\'); + s.append('0'); + nullCharacterCount++; + } else { + s.append(chars[i]); + } + } + tmp = s.toString(); + + if (nullCharacterCount > 0) { + message += "\nIdentified " + + nullCharacterCount + + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t" + + tmp; + } + } + + UserException.Builder builder; + if (ex instanceof UserException) { + builder = ((UserException) ex).rebuild(); + } else { + builder = UserException + .dataReadError(ex) + .message(message); + } + throw builder + .addContext("Line", context.currentLine()) + .addContext("Record", context.currentRecord()) + .build(logger); + } + + /** + * Finish the processing of a batch, indicates to the output + * interface to wrap up the batch + */ + public void finishBatch() { } + + /** + * Invoked once there are no more records and we are done with the + * current record reader to clean up state. + * @throws IOException for input file read errors + */ + public void close() throws IOException { + input.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java new file mode 100644 index 000000000..aced5adfd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Version 3 of the text reader. Hosts the "compliant" text reader on + * the row set framework. + */ +package org.apache.drill.exec.store.easy.text.compliant.v3; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index a379db175..37b8a4073 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -593,7 +593,6 @@ public class Foreman implements Runnable { new BasicOptimizer.BasicOptimizationContext(queryContext), plan); } - /** * Manages the end-state processing for Foreman. * @@ -726,7 +725,6 @@ public class Foreman implements Runnable { } } - @SuppressWarnings("resource") @Override public void close() { Preconditions.checkState(!isClosed); |