aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2019-03-03 17:55:22 -0800
committerVitalii Diravka <vitalii.diravka@gmail.com>2019-03-05 16:29:42 +0200
commit7e3b45967dbb97da18ba49a2fa6a67a48e33b092 (patch)
tree3f46493d29571675fbe3d4e839cbb7f15311fa08
parent2c3e2de2f94fd3f21a11c22b7944b94953e4f397 (diff)
DRILL-7074: Scan framework fixes and enhancements
Roll-up of fixes an enhancements that emerged from the effort to host the CSV reader on the new framework. closes #1676
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java85
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java230
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java241
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java44
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java172
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java19
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java46
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java153
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java170
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java281
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java459
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java223
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java255
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java6
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java77
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java557
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java946
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java45
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java113
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java250
42 files changed, 1726 insertions, 3229 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index bbf12d4b8..b366d34d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -238,8 +238,11 @@ class ReaderState {
/**
* Prepare the schema for this reader. Called for the first reader within a
- * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. If
- * this is an early-schema reader, then the result set loader already has
+ * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>.
+ * Asks the reader if it can provide a schema-only empty batch by calling
+ * the reader's <tt>defineSchema()</tt> method. If this is an early-schema
+ * reader, and it can provide a schema, then it should create an empty
+ * batch so that the the result set loader already has
* the proper value vectors set up. If this is a late-schema reader, we must
* read one batch to get the schema, then set aside the data for the next
* call to <tt>next()</tt>.
@@ -255,9 +258,10 @@ class ReaderState {
* <li>If if turned out that the file was
* empty when trying to read the schema, <tt>open()</tt> returned false
* and this method should never be called.</tt>
- * <li>Otherwise, if a schema was available, then the schema is already
- * set up in the result set loader as the result of schema negotiation, and
- * this method simply returns <tt>true</tt>.
+ * <li>Otherwise, the reader does not know if it is the first reader or
+ * not. The call to <tt>defineSchema()</tt> notifies the reader that it
+ * is the first one. The reader should set up in the result set loader
+ * with an empty batch.
* </ul>
* <p>
* Semantics for late-schema readers:
@@ -280,14 +284,12 @@ class ReaderState {
protected boolean buildSchema() {
- VectorContainer container = reader.output();
-
- if (container != null) {
+ if (reader.defineSchema()) {
// Bind the output container to the output of the scan operator.
// This returns an empty batch with the schema filled in.
- scanOp.containerAccessor.setContainer(container);
+ scanOp.containerAccessor.setContainer(reader.output());
schemaVersion = reader.schemaVersion();
return true;
}
@@ -297,7 +299,8 @@ class ReaderState {
if (! next()) {
return false;
}
- container = reader.output();
+ VectorContainer container = reader.output();
+ schemaVersion = reader.schemaVersion();
if (container.getRecordCount() == 0) {
return true;
}
@@ -374,8 +377,7 @@ class ReaderState {
private boolean readBatch() {
- // Try to read a batch. This may fail. If so, clean up the
- // mess.
+ // Try to read a batch. This may fail. If so, clean up the mess.
boolean more;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index c0985b538..61de584f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -84,8 +84,6 @@ import org.apache.drill.exec.record.VectorContainer;
public interface RowBatchReader {
- enum Result { OK, LAST_BATCH, EOF }
-
/**
* Name used when reporting errors. Can simply be the class name.
*
@@ -111,6 +109,22 @@ public interface RowBatchReader {
boolean open();
/**
+ * Called for the first reader within a scan. Allows the reader to
+ * provide an empty batch with only the schema filled in. Readers that
+ * are "early schema" (know the schema up front) should return true
+ * and create an empty batch. Readers that are "late schema" should
+ * return false. In that case, the scan operator will ask the reader
+ * to load an actual data batch, and infer the schema from that batch.
+ * <p>
+ * This step is optional and is purely for performance.
+ *
+ * @return true if this reader can (and has) defined an empty batch
+ * to describe the schema, false otherwise
+ */
+
+ boolean defineSchema();
+
+ /**
* Read the next batch. Reading continues until either EOF,
* or until the mutator indicates that the batch is full.
* The batch is considered valid if it is non-empty. Returning
@@ -129,7 +143,7 @@ public interface RowBatchReader {
* <tt>next()</tt> should be called again, <tt>false</tt> to indicate
* that EOF was reached
*
- * @throws RutimeException (<tt>UserException</tt> preferred) if an
+ * @throws RuntimeException (<tt>UserException</tt> preferred) if an
* error occurs that should fail the query.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
index 9e174146f..04b2c7eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
@@ -20,15 +20,23 @@ package org.apache.drill.exec.physical.impl.scan;
import org.apache.drill.exec.ops.OperatorContext;
/**
- * Interface to the set of readers, and reader schema, that the
- * scan operator manages. The reader factory creates and returns
- * the readers to use for the scan, as determined by the specific
- * physical plan. The reader factory also
- * translates from the select list provided
- * in the physical plan to the actual columns returned from the
- * scan operator. The translation is reader-specific; this
- * interface allows the scan operator to trigger various
- * lifecycle events.
+ * Interface to the set of readers, and reader schema, that the scan operator
+ * manages. The reader factory creates and returns the readers to use for the
+ * scan, as determined by the specific physical plan. The reader factory also
+ * translates from the select list provided in the physical plan to the actual
+ * columns returned from the scan operator. The translation is reader-specific;
+ * this interface allows the scan operator to trigger various lifecycle events.
+ * <p>
+ * This interface decouples the scan implementation from the generic tasks
+ * needed to implement Drill's Volcano iterator protocol for operators, and
+ * Drill's schema and batch semantics. A scan implementation need only
+ * implement this interface to add plugin-specific scan behavior.
+ * <p>
+ * While this interface allows a wide variety of implementations, the intent is
+ * that most actual scanners will use the "managed" framework that handles the
+ * routine projection, vector management and other tasks that tend to be common
+ * across scanners. See {@link ScanSchemaOrchestrator} for the managed
+ * framework.
*/
public interface ScanOperatorEvents {
@@ -46,11 +54,25 @@ public interface ScanOperatorEvents {
void bind(OperatorContext context);
+ /**
+ * A scanner typically readers multiple data sources (such as files or
+ * file blocks.) A batch reader handles each read. This method returns
+ * the next reader in whatever sequence that this scan defines.
+ * <p>
+ * The preferred implementation is to create each batch reader in this
+ * call to minimize resource usage. Production queries may read
+ * thousands of files or blocks, so incremental reader creation can be
+ * far more efficient than creating readers at the start of the scan.
+ *
+ * @return a batch reader for one of the scan elements within the
+ * scan physical plan for this scan operator
+ */
+
RowBatchReader nextReader();
/**
* Called when the scan operator itself is closed. Indicates that no more
- * readers are available (or will be opened).
+ * readers are available.
*/
void close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index f6c72b120..966a03901 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -35,14 +35,24 @@ import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput;
* expands to `columns`.</li>
* <li>If the columns array appears, then no other table columns
* can appear.</li>
- * <li>If the columns array appears, then the wildcard cannot also
- * appear, unless that wildcard expanded to be `columns` as
- * described above.</li>
+ * <li>Both 'columns' and the wildcard can appear for queries such
+ * as:<code><pre>
+ * select * from dfs.`multilevel/csv`
+ * where columns[1] < 1000</pre>
+ * </code></li>
* <li>The query can select specific elements such as `columns`[2].
* In this case, only array elements can appear, not the unindexed
* `columns` column.</li>
+ * <li>If is possible for `columns` to appear twice. In this case,
+ * the project operator will make a copy.</li>
* </ul>
* <p>
+ * To handle these cases, the general rule is: allow any number
+ * of wildcard or `columns` appearances in the input projection, but
+ * collapse them all down to a single occurrence of `columns` in the
+ * output projection. (Upstream code will prevent `columns` from
+ * appearing twice in its non-indexed form.)
+ * <p>
* It falls to this parser to detect a not-uncommon user error, a
* query such as the following:<pre><code>
* SELECT max(columns[1]) AS col1
@@ -83,7 +93,8 @@ public class ColumnsArrayParser implements ScanProjectionParser {
@Override
public boolean parse(RequestedColumn inCol) {
if (requireColumnsArray && inCol.isWildcard()) {
- expandWildcard();
+ createColumnsCol(
+ new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
return true;
}
if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) {
@@ -113,41 +124,24 @@ public class ColumnsArrayParser implements ScanProjectionParser {
.build(logger);
}
}
-
- // Special `columns` array column.
-
- columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
- builder.addTableColumn(columnsArrayCol);
+ createColumnsCol(inCol);
return true;
}
- /**
- * Query contained SELECT *, and we know that the reader supports only
- * the `columns` array; go ahead and expand the wildcard to the only
- * possible column.
- */
+ private void createColumnsCol(RequestedColumn inCol) {
+
+ // Special `columns` array column. Allow multiple, but
+ // project only one.
- private void expandWildcard() {
if (columnsArrayCol != null) {
- throw UserException
- .validationError()
- .message("Cannot select columns[] and `*` together")
- .build(logger);
+ return;
}
- columnsArrayCol = new UnresolvedColumnsArrayColumn(
- new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
+ columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
builder.addTableColumn(columnsArrayCol);
}
@Override
- public void validate() {
- if (builder.hasWildcard() && columnsArrayCol != null) {
- throw UserException
- .validationError()
- .message("Cannot select `columns` and `*` together")
- .build(logger);
- }
- }
+ public void validate() { }
@Override
public void validateColumn(ColumnProjection col) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index f9674dc2a..ae8502b52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.scan.file;
+import java.util.HashSet;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,6 +40,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
private final FileMetadataManager metadataManager;
private final Pattern partitionPattern;
private ScanLevelProjection builder;
+ private final Set<Integer> referencedPartitions = new HashSet<>();
// Output
@@ -64,6 +67,11 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
if (defn != null) {
return buildMetadataColumn(defn, inCol);
}
+ if (inCol.isWildcard()) {
+ buildWildcard();
+
+ // Don't consider this a match.
+ }
return false;
}
@@ -80,11 +88,18 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// Partition column
- builder.addMetadataColumn(
- new PartitionColumn(
- inCol.name(),
- Integer.parseInt(m.group(1))));
- hasImplicitCols = true;
+ int partitionIndex = Integer.parseInt(m.group(1));
+ if (! referencedPartitions.contains(partitionIndex)) {
+ builder.addMetadataColumn(
+ new PartitionColumn(
+ inCol.name(),
+ partitionIndex));
+
+ // Remember the partition for later wildcard expansion
+
+ referencedPartitions.add(partitionIndex);
+ hasImplicitCols = true;
+ }
return true;
}
@@ -107,8 +122,52 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
return true;
}
+ private void buildWildcard() {
+ if (metadataManager.useLegacyWildcardExpansion &&
+ metadataManager.useLegacyExpansionLocation) {
+
+ // Star column: this is a SELECT * query.
+
+ // Old-style wildcard handling inserts all partition columns in
+ // the scanner, removes them in Project.
+ // Fill in the file metadata columns. Can do here because the
+ // set is constant across all files.
+
+ expandPartitions();
+ }
+ }
+
@Override
- public void validate() { }
+ public void validate() {
+
+ // Expand partitions if using a wildcard appears, if using the
+ // feature to expand partitions for wildcards, and we want the
+ // partitions after data columns.
+
+ if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion &&
+ ! metadataManager.useLegacyExpansionLocation) {
+ expandPartitions();
+ }
+ }
+
+ private void expandPartitions() {
+
+ // Legacy wildcard expansion: include the file partitions for this file.
+ // This is a disadvantage for a * query: files at different directory
+ // levels will have different numbers of columns. Would be better to
+ // return this data as an array at some point.
+ // Append this after the *, keeping the * for later expansion.
+
+ for (int i = 0; i < metadataManager.partitionCount(); i++) {
+ if (referencedPartitions.contains(i)) {
+ continue;
+ }
+ builder.addMetadataColumn(new PartitionColumn(
+ metadataManager.partitionName(i), i));
+ referencedPartitions.add(i);
+ }
+ hasImplicitCols = true;
+ }
@Override
public void validateColumn(ColumnProjection outCol) { }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index fe4332a34..ba49a9f54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -53,6 +53,31 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
protected final String partitionDesignator;
protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
+
+ /**
+ * Indicates whether to expand partition columns when the query contains a wildcard.
+ * Supports queries such as the following:<code><pre>
+ * select * from dfs.`partitioned-dir`
+ * </pre><code>
+ * In which the output columns will be (columns, dir0) if the partitioned directory
+ * has one level of nesting.
+ *
+ * See {@link TestImplicitFileColumns#testImplicitColumns}
+ */
+ protected final boolean useLegacyWildcardExpansion;
+
+ /**
+ * In legacy mode, above, Drill expands partition columns whenever the
+ * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after
+ * data columns. This is actually a better position as it minimizes changes
+ * the row layout for files at different depths. Drill 1.12 moved them before
+ * data columns: at the location of the wildcard.
+ * <p>
+ * This flag, when set, uses the Drill 1.12 position. Later enhancements
+ * can unset this flag to go back to the future: use the preferred location
+ * after other columns.
+ */
+ protected final boolean useLegacyExpansionLocation;
private final FileMetadataColumnsParser parser;
// Internal state
@@ -84,7 +109,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
*/
public FileMetadataManager(OptionSet optionManager,
+ boolean useLegacyWildcardExpansion,
+ boolean useLegacyExpansionLocation,
Path rootDir, List<Path> files) {
+ this.useLegacyWildcardExpansion = useLegacyWildcardExpansion;
+ this.useLegacyExpansionLocation = useLegacyExpansionLocation;
scanRootDir = rootDir;
partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
@@ -117,6 +146,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
}
}
+ public FileMetadataManager(OptionSet optionManager,
+ Path rootDir, List<Path> files) {
+ this(optionManager, false, false, rootDir, files);
+ }
+
private int computeMaxPartition(List<Path> files) {
int maxLen = 0;
for (Path filePath : files) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index 609e9f05f..6ecf0cf05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -32,19 +32,46 @@ import org.apache.hadoop.mapred.FileSplit;
/**
* The file scan framework adds into the scan framework support for implicit
- * file metadata columns.
+ * file metadata columns. The file scan framework brings together a number of
+ * components:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The set of files and/or blocks to read.</li>
+ * <li>The file system configuration to use for working with the files
+ * or blocks.</li>
+ * <li>The factory class to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>Options as defined by the base class.</li>
+ * </ul>
+ * <p>
+ * @See {AbstractScanFramework} for details.
*/
public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiator> {
- public interface FileReaderCreator {
+ /**
+ * Creates a batch reader on demand. Unlike earlier versions of Drill,
+ * this framework creates readers one by one, when they are needed.
+ * Doing so avoids excessive resource demands that come from creating
+ * potentially thousands of readers up front.
+ * <p>
+ * The reader itself is unique to each file type. This interface
+ * provides a common interface that this framework can use to create the
+ * file-specific reader on demand.
+ */
+
+ public interface FileReaderFactory {
ManagedReader<FileSchemaNegotiator> makeBatchReader(
DrillFileSystem dfs,
FileSplit split) throws ExecutionSetupException;
}
/**
- * Implementation of the file-level schema negotiator.
+ * Implementation of the file-level schema negotiator. At present, no
+ * file-specific features exist. This class shows, however, where we would
+ * add such features.
*/
public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl
@@ -55,12 +82,12 @@ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiato
}
}
- private final FileReaderCreator readerCreator;
+ private final FileReaderFactory readerCreator;
public FileScanFramework(List<SchemaPath> projection,
List<? extends FileWork> files,
Configuration fsConf,
- FileReaderCreator readerCreator) {
+ FileReaderFactory readerCreator) {
super(projection, files, fsConf);
this.readerCreator = readerCreator;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
index 4a15ff787..bf132651b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
@@ -44,4 +44,17 @@ public abstract class MetadataColumn extends ResolvedColumn implements ConstantC
public String name() { return schema.getName(); }
public abstract MetadataColumn resolve(FileMetadata fileInfo, VectorSource source, int sourceIndex);
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" schema=\"")
+ .append(schema.toString())
+ .append(", value=")
+ .append(value)
+ .append("]")
+ .toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
index 54079013a..d285261c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
@@ -40,15 +40,91 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
* config options, and implements the matching "managed reader". All details
* of setup, projection, and so on are handled by the framework and the components
* that the framework builds upon.
+ *
+ * <h4>Inputs</h4>
+ *
+ * At this basic level, a scan framework requires just a few simple inputs:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The operator context which provides access to a memory allocator and
+ * other plumbing items.</li>
+ * <li>A method to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>The data type to use for projected columns which the reader cannot
+ * provide. (Drill allows such columns and fills in null values: traditionally
+ * nullable Int, but customizable here.)
+ * <li>Various other options.</li>
+ * </ul>
+ *
+ * <h4>Orchestration</h4>
+ *
+ * The above is sufficient to drive the entire scan operator functionality.
+ * Projection is done generically and is the same for all files. Only the
+ * reader (created via the factory class) differs from one type of file to
+ * another.
+ * <p>
+ * The framework achieves the work described below= by composing a large
+ * set of detailed classes, each of which performs some specific task. This
+ * structure leaves the reader to simply infer schema and read data.
+ * <p>
+ * In particular, rather than do all the orchestration here (which would tie
+ * that logic to the scan operation), the detailed work is delegated to the
+ * {@link ScanSchemaOrchestrator} class, with this class as a "shim" between
+ * the the Scan events API and the schema orchestrator implementation.
+ *
+ * <h4>Reader Integration</h4>
+ *
+ * The details of how a file is structured, how a schema is inferred, how
+ * data is decoded: all that is encapsulated in the reader. The only real
+ * Interaction between the reader and the framework is:
+ * <ul>
+ * <li>The reader "negotiates" a schema with the framework. The framework
+ * knows the projection list from the query plan, knows something about
+ * data types (whether a column should be scalar, a map or an array), and
+ * knows about the schema already defined by prior readers. The reader knows
+ * what schema it can produce (if "early schema.") The schema negotiator
+ * class handles this task.</li>
+ * <li>The reader reads data from the file and populates value vectors a
+ * batch at a time. The framework creates the result set loader to use for
+ * this work. The schema negotiator returns that loader to the reader, which
+ * uses it during read.
+ * <p>
+ * It is important to note that the result set loader also defines a schema:
+ * the schema requested by the reader. If the reader wants to read three
+ * columns, a, b, and c, then that is the schema that the result set loader
+ * supports. This is true even if the query plan only wants column a, or
+ * wants columns c, a. The framework handles the projection task so the
+ * reader does not have to worry about it. Reading an unwanted column
+ * is low cost: the result set loader will have provided a "dummy" column
+ * writer that simply discards the value. This is just as fast as having the
+ * reader use if-statements or a table to determine which columns to save.
+ * <p>
+ * A reader may be "late schema", true "schema on read." In this case, the
+ * reader simply tells the result set loader to create a new column reader
+ * on the fly. The framework will work out if that new column is to be
+ * projected and will return either a real column writer (projected column)
+ * or a dummy column writer (unprojected column.)</li>
+ * <li>The reader then reads batches of data until all data is read. The
+ * result set loader signals when a batch is full; the reader should not
+ * worry about this detail itself.</li>
+ * <li>The reader then releases its resources.</li>
+ * </ul>
*/
public abstract class AbstractScanFramework<T extends SchemaNegotiator> implements ScanOperatorEvents {
+ // Inputs
+
protected final List<SchemaPath> projection;
protected MajorType nullType;
protected int maxBatchRowCount;
protected int maxBatchByteCount;
protected OperatorContext context;
+
+ // Internal state
+
protected ScanSchemaOrchestrator scanOrchestrator;
public AbstractScanFramework(List<SchemaPath> projection) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 631881265..dead9cb46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -65,9 +65,13 @@ public interface SchemaNegotiator {
* columns during the read.
*
* @param schema the table schema if known at open time
+ * @param isComplete true if the schema is complete: if it can be used
+ * to define an empty schema-only batch for the first reader. Set to
+ * false if the schema is partial: if the reader must read rows to
+ * determine the full schema
*/
- void setTableSchema(TupleMetadata schema);
+ void setTableSchema(TupleMetadata schema, boolean isComplete);
/**
* Set the preferred batch size (which may be overridden by the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 46f363dd5..0841049fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -53,6 +53,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
protected final AbstractScanFramework<?> basicFramework;
private final ShimBatchReader<? extends SchemaNegotiator> shim;
protected TupleMetadata tableSchema;
+ protected boolean isSchemaComplete;
protected int batchSize = ValueVector.MAX_ROW_COUNT;
public SchemaNegotiatorImpl(AbstractScanFramework<?> framework, ShimBatchReader<? extends SchemaNegotiator> shim) {
@@ -66,8 +67,9 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
}
@Override
- public void setTableSchema(TupleMetadata schema) {
+ public void setTableSchema(TupleMetadata schema, boolean isComplete) {
tableSchema = schema;
+ this.isSchemaComplete = schema != null && isComplete;
}
@Override
@@ -97,4 +99,6 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
public boolean isProjectionEmpty() {
return basicFramework.scanOrchestrator().isProjectNone();
}
+
+ public boolean isSchemaComplete() { return isSchemaComplete; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index 0dc3c5736..a97b32968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.framework;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.record.VectorContainer;
@@ -44,6 +44,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
protected final AbstractScanFramework<T> manager;
protected final ManagedReader<T> reader;
protected final ReaderSchemaOrchestrator readerOrchestrator;
+ protected SchemaNegotiatorImpl schemaNegotiator;
protected ResultSetLoader tableLoader;
/**
@@ -96,10 +97,19 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
}
@Override
+ public boolean defineSchema() {
+ if (schemaNegotiator.isSchemaComplete()) {
+ readerOrchestrator.defineSchema();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
public boolean next() {
// The reader may report EOF, but the result set loader might
- // have a lookhead row.
+ // have a lookahead row.
if (eof && ! tableLoader.hasRows()) {
return false;
@@ -181,6 +191,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
}
public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) {
+ this.schemaNegotiator = schemaNegotiator;
readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema);
return tableLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
index efd881bb1..096b8447b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
@@ -29,7 +29,7 @@
* is an old version without a new column c, while file B includes the column.
* And so on.
* <p>
- * The scan operator here works to ensure schema continuity as much as
+ * The scan operator works to ensure schema continuity as much as
* possible, smoothing out "soft" schema changes that are simply artifacts of
* reading a collection of files. Only "hard" changes (true changes) are
* passed downstream.
@@ -157,5 +157,29 @@
* output batch in the order specified by the original SELECT list (or table order,
* if the original SELECT had a wildcard.) Fortunately, this is just involves
* moving around pointers to vectors; no actual data is moved during projection.
+ *
+ * <h4>Class Structure</h4>
+ *
+ * Some of the key classes here include:
+ * <ul>
+ * <li>{@link RowBatchReader} an extremely simple interface for reading data.
+ * We would like many developers to create new plugins and readers. The simplified
+ * interface pushes all complexity into the scan framework, leaving the reader to
+ * just read.</li>
+ * <li>{@link ShimBatchReader} an implementation of the above that converts from
+ * the simplified API to add additional structure to work with the result set loader.
+ * (The base interface is agnostic about how rows are read.)</li>
+ * <li>{@link ScheamNegotiator} and interface that allows a batch reader to
+ * "negotiate" a schema with the scan framework. The scan framework knows the
+ * columns that are to be projected. The reader knows what columns it can offer.
+ * The schema negotiator works out how to combine the two. It expresses the result
+ * as a result set loader. Column writers are defined for all columns that the
+ * reader wants to read, but only the materialized (projected) columns have actual
+ * vectors behind them. The non-projected columns are "free-wheeling" "dummy"
+ * writers.
+ * </li>
+ *
+ * And, yes, sorry for the terminology. File "readers" read from files, but
+ * use column "writers" to write to value vectors.
*/
package org.apache.drill.exec.physical.impl.scan.framework;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
index 3e302a1e8..d7de30ad6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
@@ -36,9 +36,90 @@
* <p>
* See {@link ScanOperatorExec} for details of the scan operator protocol
* and components.
+ *
+ * <h4>Traditional Class Structure<h4>
+ * The original design was simple: but required each reader to handle many
+ * detailed tasks.
+ * <pre><code>
+ * +------------+ +-----------+
+ * | Scan Batch | +---> | ScanBatch |
+ * | Creator | | +-----------+
+ * +------------+ | |
+ * | | |
+ * v | |
+ * +------------+ | v
+ * | Format | ---+ +---------------+
+ * | Plugin | -----> | Record Reader |
+ * +------------+ +---------------+
+ *
+ * </code></pre>
+ *
+ * The scan batch creator is unique to each storage plugin and is created
+ * based on the physical operator configuration ("pop config"). The
+ * scan batch creator delegates to the format plugin to create both the
+ * scan batch (the scan operator) and the set of readers which the scan
+ * batch will manage.
+ * <p>
+ * The scan batch
+ * provides a <code>Mutator</code> that creates the vectors used by the
+ * record readers. Schema continuity comes from reusing the Mutator from one
+ * file/block to the next.
+ * <p>
+ * One characteristic of this system is that all the record readers are
+ * created up front. If we must read 1000 blocks, we'll create 1000 record
+ * readers. Developers must be very careful to only allocate resources when
+ * the reader is opened, and release resources when the reader is closed.
+ * Else, resource bloat becomes a large problem.
+ *
+ * <h4>Revised Class Structure</h4>
+ *
+ * The new design is more complex because it divides tasks up into separate
+ * classes. The class structure is larger, but each class is smaller, more
+ * focused and does just one task.
+ * <pre><code>
+ * +------------+ +---------------+
+ * | Scan Batch | -------> | Format Plugin |
+ * | Creator | +---------------+
+ * +------------+ / | \
+ * / | \
+ * +---------------------+ | \ +---------------+
+ * | OperatorRecordBatch | | +---->| ScanFramework |
+ * +---------------------+ | | +---------------+
+ * v | |
+ * +------------------+ |
+ * | ScanOperatorExec | |
+ * +------------------+ v
+ * | +--------------+
+ * +----------> | Batch Reader |
+ * +--------------+
+ * </code></pre>
+ *
+ * Here, the scan batch creator again delegates to the format plugin. The
+ * format plugin creates three objects:
+ * <ul>
+ * <li>The <code>OperatorRecordBatch</code>, which encapsulates the Volcano
+ * iterator protocol. It also holds onto the output batch. This allows the
+ * operator implementation to just focus on its specific job.</li>
+ * <li>The <code>ScanOperatorExec</code> is the operator implementation for
+ * the new result-set-loader based scan.</li>
+ * <li>The scan framework is specific to each kind of reader. It handles
+ * everything which is unique to that reader. Rather than inheriting from
+ * the scan itself, the framework follows the strategy pattern: it says how
+ * to do a scan for the target format.<li>
+ * </ul>
+ *
+ * The overall structure uses the "composition" pattern: what is combined
+ * into a small set of classes in the traditional model is broken out into
+ * focused classes in the revised model.
+ * <p>
+ * A key part of the scan strategy is the batch reader. ("Batch" because
+ * it reads an entire batch at a time, using the result set loader.) The
+ * framework creates batch readers one by one as needed. Resource bloat
+ * is less of an issue because only one batch reader instance exists at
+ * any time for each scan operator instance.
* <p>
- * See the "managed" package for a reusable framework for handling the
- * details of batches, schema and so on.
+ * Each of the above is further broken down into additional classes to
+ * handle projection and so on.
*/
package org.apache.drill.exec.physical.impl.scan;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 41cc59582..c0bcfa3b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
*/
public class ExplicitSchemaProjection extends SchemaLevelProjection {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class);
public ExplicitSchemaProjection(ScanLevelProjection scanProj,
TupleMetadata tableSchema,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
new file mode 100644
index 000000000..029b6a005
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Orchestrates projection tasks for a single reader within the set that the
+ * scan operator manages. Vectors are reused across readers, but via a vector
+ * cache. All other state is distinct between readers.
+ */
+
+public class ReaderSchemaOrchestrator implements VectorSource {
+
+ private final ScanSchemaOrchestrator scanOrchestrator;
+ private int readerBatchSize;
+ private ResultSetLoaderImpl tableLoader;
+ private int prevTableSchemaVersion = -1;
+
+ /**
+ * Assembles the table, metadata and null columns into the final output
+ * batch to be sent downstream. The key goal of this class is to "smooth"
+ * schema changes in this output batch by absorbing trivial schema changes
+ * that occur across readers.
+ */
+
+ private ResolvedRow rootTuple;
+ private VectorContainer tableContainer;
+
+ public ReaderSchemaOrchestrator(ScanSchemaOrchestrator scanSchemaOrchestrator) {
+ scanOrchestrator = scanSchemaOrchestrator;
+ readerBatchSize = scanOrchestrator.scanBatchRecordLimit;
+ }
+
+ public void setBatchSize(int size) {
+ if (size > 0) {
+ readerBatchSize = Math.min(size, scanOrchestrator.scanBatchRecordLimit);
+ }
+ }
+
+ public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
+ OptionBuilder options = new OptionBuilder();
+ options.setRowCountLimit(readerBatchSize);
+ options.setVectorCache(scanOrchestrator.vectorCache);
+ options.setBatchSizeLimit(scanOrchestrator.scanBatchByteLimit);
+
+ // Set up a selection list if available and is a subset of
+ // table columns. (Only needed for non-wildcard queries.)
+ // The projection list includes all candidate table columns
+ // whether or not they exist in the up-front schema. Handles
+ // the odd case where the reader claims a fixed schema, but
+ // adds a column later.
+
+ if (! scanOrchestrator.scanProj.projectAll()) {
+ options.setProjectionSet(scanOrchestrator.scanProj.readerProjection());
+ }
+ options.setSchema(tableSchema);
+
+ // Create the table loader
+
+ tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build());
+ return tableLoader;
+ }
+
+ public boolean hasSchema() {
+ return prevTableSchemaVersion >= 0;
+ }
+
+ public void defineSchema() {
+ tableLoader.startEmptyBatch();
+ endBatch();
+ }
+
+ public void startBatch() {
+ tableLoader.startBatch();
+ }
+
+ /**
+ * Build the final output batch by projecting columns from the three input sources
+ * to the output batch. First, build the metadata and/or null columns for the
+ * table row count. Then, merge the sources.
+ */
+
+ public void endBatch() {
+
+ // Get the batch results in a container.
+
+ tableContainer = tableLoader.harvest();
+
+ // If the schema changed, set up the final projection based on
+ // the new (or first) schema.
+
+ if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
+ reviseOutputProjection();
+ } else {
+
+ // Fill in the null and metadata columns.
+
+ populateNonDataColumns();
+ }
+ rootTuple.setRowCount(tableContainer.getRecordCount());
+ }
+
+ private void populateNonDataColumns() {
+ int rowCount = tableContainer.getRecordCount();
+ scanOrchestrator.metadataManager.load(rowCount);
+ rootTuple.loadNulls(rowCount);
+ }
+
+ /**
+ * Create the list of null columns by comparing the SELECT list against the
+ * columns available in the batch schema. Create null columns for those that
+ * are missing. This is done for the first batch, and any time the schema
+ * changes. (For early-schema, the projection occurs once as the schema is set
+ * up-front and does not change.) For a SELECT *, the null column check
+ * only need be done if null columns were created when mapping from a prior
+ * schema.
+ */
+
+ private void reviseOutputProjection() {
+
+ // Do the table-schema level projection; the final matching
+ // of projected columns to available columns.
+
+ TupleMetadata tableSchema = tableLoader.harvestSchema();
+ if (scanOrchestrator.schemaSmoother != null) {
+ doSmoothedProjection(tableSchema);
+ } else if (scanOrchestrator.scanProj.hasWildcard()) {
+ doWildcardProjection(tableSchema);
+ } else {
+ doExplicitProjection(tableSchema);
+ }
+
+ // Combine metadata, nulls and batch data to form the final
+ // output container. Columns are created by the metadata and null
+ // loaders only in response to a batch, so create the first batch.
+
+ rootTuple.buildNulls(scanOrchestrator.vectorCache);
+ scanOrchestrator.metadataManager.define();
+ populateNonDataColumns();
+ rootTuple.project(tableContainer, scanOrchestrator.outputContainer);
+ prevTableSchemaVersion = tableLoader.schemaVersion();
+ }
+
+ private void doSmoothedProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+ scanOrchestrator.schemaSmoother.resolve(tableSchema, rootTuple);
+ }
+
+ /**
+ * Query contains a wildcard. The schema-level projection includes
+ * all columns provided by the reader.
+ */
+
+ private void doWildcardProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(null);
+ new WildcardSchemaProjection(scanOrchestrator.scanProj,
+ tableSchema, rootTuple, scanOrchestrator.schemaResolvers);
+ }
+
+ /**
+ * Explicit projection: include only those columns actually
+ * requested by the query, which may mean filling in null
+ * columns for projected columns that don't actually exist
+ * in the table.
+ *
+ * @param tableSchema newly arrived schema
+ */
+
+ private void doExplicitProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+ new ExplicitSchemaProjection(scanOrchestrator.scanProj,
+ tableSchema, rootTuple,
+ scanOrchestrator.schemaResolvers);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return tableContainer.getValueVector(index).getValueVector();
+ }
+
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (tableLoader != null) {
+ tableLoader.close();
+ tableLoader = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ if (rootTuple != null) {
+ rootTuple.close();
+ rootTuple = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ scanOrchestrator.metadataManager.endFile();
+ if (ex != null) {
+ throw ex;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 83d40a310..f90f722b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -100,8 +100,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
public class ScanLevelProjection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class);
-
/**
* Interface for add-on parsers, avoids the need to create
* a single, tightly-coupled parser for all types of columns.
@@ -128,12 +126,26 @@ public class ScanLevelProjection {
// Internal state
+ protected boolean includesWildcard;
protected boolean sawWildcard;
// Output
protected List<ColumnProjection> outputCols = new ArrayList<>();
+
+ /**
+ * Projection definition for the scan a whole. Parsed form of the input
+ * projection list.
+ */
+
protected RequestedTuple outputProjection;
+
+ /**
+ * Projection definition passed to each reader. This is the set of
+ * columns that the reader is asked to provide.
+ */
+
+ protected RequestedTuple readerProjection;
protected boolean hasWildcard;
protected boolean emptyProjection = true;
@@ -158,6 +170,18 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.bind(this);
}
+
+ // First pass: check if a wildcard exists.
+
+ for (RequestedColumn inCol : outputProjection.projections()) {
+ if (inCol.isWildcard()) {
+ includesWildcard = true;
+ break;
+ }
+ }
+
+ // Second pass: process remaining columns.
+
for (RequestedColumn inCol : outputProjection.projections()) {
if (inCol.isWildcard()) {
mapWildcard(inCol);
@@ -169,6 +193,23 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.build();
}
+
+ // Create the reader projection which includes either all columns
+ // (saw a wildcard) or just the unresolved columns (which excludes
+ // implicit columns.)
+
+ List<RequestedColumn> outputProj;
+ if (hasWildcard()) {
+ outputProj = null;
+ } else {
+ outputProj = new ArrayList<>();
+ for (ColumnProjection col : outputCols) {
+ if (col instanceof UnresolvedColumn) {
+ outputProj.add(((UnresolvedColumn) col).element());
+ }
+ }
+ }
+ readerProjection = RequestedTupleImpl.build(outputProj);
}
/**
@@ -181,6 +222,7 @@ public class ScanLevelProjection {
// Wildcard column: this is a SELECT * query.
+ assert includesWildcard;
if (sawWildcard) {
throw new IllegalArgumentException("Duplicate * entry in project list");
}
@@ -245,6 +287,15 @@ public class ScanLevelProjection {
}
}
+ // If the project list has a wildcard, and the column is not one recognized
+ // by the specialized parsers above, then just ignore it. It is likely a duplicate
+ // column name. In any event, it will be processed by the Project operator on
+ // top of this scan.
+
+ if (includesWildcard) {
+ return;
+ }
+
// This is a desired table column.
addTableColumn(
@@ -281,15 +332,6 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.validateColumn(outCol);
}
- switch (outCol.nodeType()) {
- case UnresolvedColumn.UNRESOLVED:
- if (hasWildcard()) {
- throw new IllegalArgumentException("Cannot select table columns and * together");
- }
- break;
- default:
- break;
- }
}
}
@@ -333,6 +375,8 @@ public class ScanLevelProjection {
public RequestedTuple rootProjection() { return outputProjection; }
+ public RequestedTuple readerProjection() { return readerProjection; }
+
@Override
public String toString() {
return new StringBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index fe78f5a75..a5d6ca2a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -23,15 +23,10 @@ import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
-import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
-import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
-import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
/**
@@ -154,212 +149,6 @@ public class ScanSchemaOrchestrator {
public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
- /**
- * Orchestrates projection tasks for a single reader with the set that the
- * scan operator manages. Vectors are reused across readers, but via a vector
- * cache. All other state is distinct between readers.
- */
-
- public class ReaderSchemaOrchestrator implements VectorSource {
-
- private int readerBatchSize;
- private ResultSetLoaderImpl tableLoader;
- private int prevTableSchemaVersion = -1;
-
- /**
- * Assembles the table, metadata and null columns into the final output
- * batch to be sent downstream. The key goal of this class is to "smooth"
- * schema changes in this output batch by absorbing trivial schema changes
- * that occur across readers.
- */
-
- private ResolvedRow rootTuple;
- private VectorContainer tableContainer;
-
- public ReaderSchemaOrchestrator() {
- readerBatchSize = scanBatchRecordLimit;
- }
-
- public void setBatchSize(int size) {
- if (size > 0) {
- readerBatchSize = Math.min(size, scanBatchRecordLimit);
- }
- }
-
- public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
- OptionBuilder options = new OptionBuilder();
- options.setRowCountLimit(readerBatchSize);
- options.setVectorCache(vectorCache);
- options.setBatchSizeLimit(scanBatchByteLimit);
-
- // Set up a selection list if available and is a subset of
- // table columns. (Only needed for non-wildcard queries.)
- // The projection list includes all candidate table columns
- // whether or not they exist in the up-front schema. Handles
- // the odd case where the reader claims a fixed schema, but
- // adds a column later.
-
- if (! scanProj.projectAll()) {
- options.setProjectionSet(scanProj.rootProjection());
- }
- options.setSchema(tableSchema);
-
- // Create the table loader
-
- tableLoader = new ResultSetLoaderImpl(allocator, options.build());
-
- // If a schema is given, create a zero-row batch to announce the
- // schema downstream in the form of an empty batch.
-
- if (tableSchema != null) {
- tableLoader.startEmptyBatch();
- endBatch();
- }
-
- return tableLoader;
- }
-
- public boolean hasSchema() {
- return prevTableSchemaVersion >= 0;
- }
-
- public void startBatch() {
- tableLoader.startBatch();
- }
-
- /**
- * Build the final output batch by projecting columns from the three input sources
- * to the output batch. First, build the metadata and/or null columns for the
- * table row count. Then, merge the sources.
- */
-
- public void endBatch() {
-
- // Get the batch results in a container.
-
- tableContainer = tableLoader.harvest();
-
- // If the schema changed, set up the final projection based on
- // the new (or first) schema.
-
- if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
- reviseOutputProjection();
- } else {
-
- // Fill in the null and metadata columns.
-
- populateNonDataColumns();
- }
- rootTuple.setRowCount(tableContainer.getRecordCount());
- }
-
- private void populateNonDataColumns() {
- int rowCount = tableContainer.getRecordCount();
- metadataManager.load(rowCount);
- rootTuple.loadNulls(rowCount);
- }
-
- /**
- * Create the list of null columns by comparing the SELECT list against the
- * columns available in the batch schema. Create null columns for those that
- * are missing. This is done for the first batch, and any time the schema
- * changes. (For early-schema, the projection occurs once as the schema is set
- * up-front and does not change.) For a SELECT *, the null column check
- * only need be done if null columns were created when mapping from a prior
- * schema.
- */
-
- private void reviseOutputProjection() {
-
- // Do the table-schema level projection; the final matching
- // of projected columns to available columns.
-
- TupleMetadata tableSchema = tableLoader.harvestSchema();
- if (schemaSmoother != null) {
- doSmoothedProjection(tableSchema);
- } else if (scanProj.hasWildcard()) {
- doWildcardProjection(tableSchema);
- } else {
- doExplicitProjection(tableSchema);
- }
-
- // Combine metadata, nulls and batch data to form the final
- // output container. Columns are created by the metadata and null
- // loaders only in response to a batch, so create the first batch.
-
- rootTuple.buildNulls(vectorCache);
- metadataManager.define();
- populateNonDataColumns();
- rootTuple.project(tableContainer, outputContainer);
- prevTableSchemaVersion = tableLoader.schemaVersion();
- }
-
- private void doSmoothedProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(
- new NullColumnBuilder(nullType, allowRequiredNullColumns));
- schemaSmoother.resolve(tableSchema, rootTuple);
- }
-
- /**
- * Query contains a wildcard. The schema-level projection includes
- * all columns provided by the reader.
- */
-
- private void doWildcardProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(null);
- new WildcardSchemaProjection(scanProj,
- tableSchema, rootTuple, schemaResolvers);
- }
-
- /**
- * Explicit projection: include only those columns actually
- * requested by the query, which may mean filling in null
- * columns for projected columns that don't actually exist
- * in the table.
- *
- * @param tableSchema newly arrived schema
- */
-
- private void doExplicitProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(
- new NullColumnBuilder(nullType, allowRequiredNullColumns));
- new ExplicitSchemaProjection(scanProj,
- tableSchema, rootTuple,
- schemaResolvers);
- }
-
- @Override
- public ValueVector vector(int index) {
- return tableContainer.getValueVector(index).getValueVector();
- }
-
- public void close() {
- RuntimeException ex = null;
- try {
- if (tableLoader != null) {
- tableLoader.close();
- tableLoader = null;
- }
- }
- catch (RuntimeException e) {
- ex = e;
- }
- try {
- if (rootTuple != null) {
- rootTuple.close();
- rootTuple = null;
- }
- }
- catch (RuntimeException e) {
- ex = ex == null ? e : ex;
- }
- metadataManager.endFile();
- if (ex != null) {
- throw ex;
- }
- }
- }
-
// Configuration
/**
@@ -367,16 +156,16 @@ public class ScanSchemaOrchestrator {
* not set, the null type is the Drill default.
*/
- private MajorType nullType;
+ MajorType nullType;
/**
* Creates the metadata (file and directory) columns, if needed.
*/
- private MetadataManager metadataManager;
- private final BufferAllocator allocator;
- private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
- private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
+ MetadataManager metadataManager;
+ final BufferAllocator allocator;
+ int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
+ int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
private final List<ScanProjectionParser> parsers = new ArrayList<>();
/**
@@ -389,7 +178,7 @@ public class ScanSchemaOrchestrator {
List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>();
private boolean useSchemaSmoothing;
- private boolean allowRequiredNullColumns;
+ boolean allowRequiredNullColumns;
// Internal state
@@ -402,14 +191,14 @@ public class ScanSchemaOrchestrator {
* vectors rather than vector instances, this cache can be deprecated.
*/
- private ResultVectorCacheImpl vectorCache;
- private ScanLevelProjection scanProj;
+ ResultVectorCacheImpl vectorCache;
+ ScanLevelProjection scanProj;
private ReaderSchemaOrchestrator currentReader;
- private SchemaSmoother schemaSmoother;
+ SchemaSmoother schemaSmoother;
// Output
- private VectorContainer outputContainer;
+ VectorContainer outputContainer;
public ScanSchemaOrchestrator(BufferAllocator allocator) {
this.allocator = allocator;
@@ -493,20 +282,12 @@ public class ScanSchemaOrchestrator {
ScanProjectionParser parser = metadataManager.projectionParser();
if (parser != null) {
-
- // For compatibility with Drill 1.12, insert the file metadata
- // parser before others so that, in a wildcard query, metadata
- // columns appear before others (such as the `columns` column.)
- // This is temporary and should be removed once the test framework
- // is restored to Drill 1.11 functionality.
-
parsers.add(parser);
}
// Parse the projection list.
scanProj = new ScanLevelProjection(projection, parsers);
-
if (scanProj.hasWildcard() && useSchemaSmoothing) {
schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers);
}
@@ -526,7 +307,7 @@ public class ScanSchemaOrchestrator {
public ReaderSchemaOrchestrator startReader() {
closeReader();
- currentReader = new ReaderSchemaOrchestrator();
+ currentReader = new ReaderSchemaOrchestrator(this);
return currentReader;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
index c7bae278e..a75611432 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
@@ -61,8 +61,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
public class SchemaLevelProjection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class);
-
/**
* Schema-level projection is customizable. Implement this interface, and
* add an instance to the scan orchestrator, to perform custom mappings
@@ -81,10 +79,7 @@ public class SchemaLevelProjection {
protected SchemaLevelProjection(
List<SchemaProjectionResolver> resolvers) {
- if (resolvers == null) {
- resolvers = new ArrayList<>();
- }
- this.resolvers = resolvers;
+ this.resolvers = resolvers == null ? new ArrayList<>() : resolvers;
for (SchemaProjectionResolver resolver : resolvers) {
resolver.startResolution();
}
@@ -97,6 +92,8 @@ public class SchemaLevelProjection {
return;
}
}
- throw new IllegalStateException("No resolver for column: " + col.nodeType());
+ throw new IllegalStateException(
+ String.format("No resolver for column `%s` of type %d",
+ col.name(), col.nodeType()));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
index a5a52c075..155fcf886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -22,6 +22,56 @@
* or may be "missing" with null values applied. The code here prepares
* a run-time projection plan based on the actual table schema.
* <p>
+ * Looks at schema as a set of transforms.
+ * <ul>
+ * <li>Scan-level projection list from the query plan: The list of columns
+ * (or the wildcard) as requested by the user in the query. The planner
+ * determines which columns to project. In Drill, projection is speculative:
+ * it is a list of names which the planner hopes will appear in the data
+ * files. The reader must make up columns (the infamous nullable INT) when
+ * it turns out that no such column exists. Else, the reader must figure out
+ * the data type for any columns that does exist.
+ * <p>
+ * The scan project list defines the set of columns which the scan operator
+ * is obliged to send downstream. Ideally, the scan operator sends exactly the
+ * same schema (the project list with types filled in) for all batches. Since
+ * batches may come from different files, the scan operator is obligated to
+ * unify the schemas from those files (or blocks.)</ul>
+ * <li>Reader (file)-level projection occurs for each reader. A single scan
+ * may use multiple readers to read data. Each reader may offer more information
+ * about the schema. For example, a Parquet reader can obtain schema information
+ * from the Parquet headers. A JDBC reader obtains schema information from the
+ * returned schema. This is called "early schema." File-based readers can at least
+ * add implicit file or partition columns.
+ * <p>
+ * The result is a refined schema: the scan level schema with more information
+ * filled in. For Parquet, all projection information can be filled in. For
+ * CSV or JSON, we can only add file metadata information, but not yet the
+ * actual data schema.</ul>
+ * <li>Batch-level schema: once a reader reads actual data, it now knows
+ * exactly what it read. This is the "schema on read model." Thus, after reading
+ * a batch, any remaining uncertainty about the projected schema is removed.
+ * The actual data defined data types and so on.
+ * <p>
+ * Readers such as JSON and CSV are "late schema": they don't know the data
+ * schema until they read the file. This is true "schema on read." Further, for
+ * JSON, the data may change from one batch to the next as the reader "discovers"
+ * fields that did not appear in earlier batches. This requires some amount of
+ * "schema smoothing": the ability to preserve a consistent output schema even
+ * as the input schema jiggles around some.</ul>
+ * </ul>
+ * <p>
+ * The goal of this mechanism is to handle the above use cases cleanly, in a
+ * common set of classes, and to avoid the need for each reader to figure out
+ * all these issues for themselves (as was the case with earlier versions of
+ * Drill.)
+ * <p>
+ * Because these issues are complex, the code itself is complex. To make the
+ * code easier to manage, each bit of functionality is encapsulated in a
+ * distinct class. Classes combine via composition to create a "framework"
+ * suitable for each kind of reader: whether it be early or late schema,
+ * file-based or something else, etc.
+ * <p>
* The core concept is one of successive refinement of the project
* list through a set of rewrites:
* <ul>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
index c1e383eb6..f464bae72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
@@ -37,7 +37,7 @@ public class ImpliedTupleRequest implements RequestedTuple {
new ImpliedTupleRequest(false);
public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>();
- private boolean allProjected;
+ private final boolean allProjected;
public ImpliedTupleRequest(boolean allProjected) {
this.allProjected = allProjected;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index cd782c766..4643c57d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -73,7 +73,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
public class RequestedTupleImpl implements RequestedTuple {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
private final RequestedColumnImpl parent;
private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
@@ -86,6 +86,13 @@ public class RequestedTupleImpl implements RequestedTuple {
this.parent = parent;
}
+ public RequestedTupleImpl(List<RequestedColumn> cols) {
+ parent = null;
+ for (RequestedColumn col : cols) {
+ projection.add(col.name(), col);
+ }
+ }
+
@Override
public RequestedColumn get(String colName) {
return projection.get(colName.toLowerCase());
@@ -119,10 +126,43 @@ public class RequestedTupleImpl implements RequestedTuple {
}
/**
+ * Create a requested tuple projection from a rewritten top-level
+ * projection list. The columns within the list have already been parsed to
+ * pick out arrays, maps and scalars. The list must not include the
+ * wildcard: a wildcard list must be passed in as a null list. An
+ * empty list means project nothing. Null list means project all, else
+ * project only the columns in the list.
+ *
+ * @param projList top-level, parsed columns
+ * @return the tuple projection for the top-leel row
+ */
+
+ public static RequestedTuple build(List<RequestedColumn> projList) {
+ if (projList == null) {
+ return new ImpliedTupleRequest(true);
+ }
+ if (projList.isEmpty()) {
+ return new ImpliedTupleRequest(false);
+ }
+ return new RequestedTupleImpl(projList);
+ }
+
+ /**
* Parse a projection list. The list should consist of a list of column names;
- * any wildcards should have been processed by the caller. An empty list means
+ * or wildcards. An empty list means
* nothing is projected. A null list means everything is projected (that is, a
* null list here is equivalent to a wildcard in the SELECT statement.)
+ * <p>
+ * The projection list may include both a wildcard and column names (as in
+ * the case of implicit columns.) This results in a final list that both
+ * says that everything is projected, and provides the list of columns.
+ * <p>
+ * Parsing is used at two different times. First, to parse the list from
+ * the physical operator. This has the case above: an explicit wildcard
+ * and/or additional columns. Then, this class is used again to prepare the
+ * physical projection used when reading. In this case, wildcards should
+ * be removed, implicit columns pulled out, and just the list of read-level
+ * columns should remain.
*
* @param projList
* the list of projected columns, or null if no projection is to be
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index 3587e2711..88ccd3c41 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -22,33 +22,42 @@ import static org.junit.Assert.fail;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+/**
+ * Test the "columns" array mechanism integrated with the scan schema
+ * orchestrator including simulating reading data.
+ */
+
+@Category(RowSetTests.class)
public class TestColumnsArray extends SubOperatorTest {
- /**
- * Test columns array. The table must be able to support it by having a
- * matching column.
- */
+ private static class MockScanner {
+ ScanSchemaOrchestrator scanner;
+ ReaderSchemaOrchestrator reader;
+ ResultSetLoader loader;
+ }
- @Test
- public void testColumnsArray() {
+ private MockScanner buildScanner(List<SchemaPath> projList) {
+
+ MockScanner mock = new MockScanner();
// Set up the file metadata manager
@@ -64,21 +73,19 @@ public class TestColumnsArray extends SubOperatorTest {
// Configure the schema orchestrator
- ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator());
- scanner.withMetadata(metadataManager);
- scanner.addParser(colsManager.projectionParser());
- scanner.addResolver(colsManager.resolver());
+ mock.scanner = new ScanSchemaOrchestrator(fixture.allocator());
+ mock.scanner.withMetadata(metadataManager);
+ mock.scanner.addParser(colsManager.projectionParser());
+ mock.scanner.addResolver(colsManager.resolver());
- // SELECT filename, columns, dir0 ...
+ // SELECT <proj list> ...
- scanner.build(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
- ColumnsArrayManager.COLUMNS_COL,
- ScanTestUtils.partitionColName(0)));
+ mock.scanner.build(projList);
// FROM z.csv
metadataManager.startFile(filePath);
- ReaderSchemaOrchestrator reader = scanner.startReader();
+ mock.reader = mock.scanner.startReader();
// Table schema (columns: VARCHAR[])
@@ -86,7 +93,25 @@ public class TestColumnsArray extends SubOperatorTest {
.addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
- ResultSetLoader loader = reader.makeTableLoader(tableSchema);
+ mock.loader = mock.reader.makeTableLoader(tableSchema);
+
+ // First empty batch
+
+ mock.reader.defineSchema();
+ return mock;
+ }
+
+ /**
+ * Test columns array. The table must be able to support it by having a
+ * matching column.
+ */
+
+ @Test
+ public void testColumnsArray() {
+
+ MockScanner mock = buildScanner(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
+ ColumnsArrayManager.COLUMNS_COL,
+ ScanTestUtils.partitionColName(0)));
// Verify empty batch.
@@ -99,18 +124,18 @@ public class TestColumnsArray extends SubOperatorTest {
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ assertNotNull(mock.scanner.output());
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
}
// Create a batch of data.
- reader.startBatch();
- loader.writer()
+ mock.reader.startBatch();
+ mock.loader.writer()
.addRow(new Object[] {new String[] {"fred", "flintstone"}})
.addRow(new Object[] {new String[] {"barney", "rubble"}});
- reader.endBatch();
+ mock. reader.endBatch();
// Verify
@@ -120,11 +145,100 @@ public class TestColumnsArray extends SubOperatorTest {
.addRow("z.csv", new String[] {"barney", "rubble"}, "x")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
}
- scanner.close();
+ mock.scanner.close();
+ }
+
+ @Test
+ public void testWildcard() {
+
+ MockScanner mock = buildScanner(RowSetTestUtils.projectAll());
+
+ // Verify empty batch.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .build();
+
+ assertNotNull(mock.scanner.output());
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
+ }
+
+ // Create a batch of data.
+
+ mock.reader.startBatch();
+ mock.loader.writer()
+ .addRow(new Object[] {new String[] {"fred", "flintstone"}})
+ .addRow(new Object[] {new String[] {"barney", "rubble"}});
+ mock. reader.endBatch();
+
+ // Verify
+
+ {
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addSingleCol(new String[] {"fred", "flintstone"})
+ .addSingleCol(new String[] {"barney", "rubble"})
+ .build();
+
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
+ }
+
+ mock.scanner.close();
+ }
+
+ @Test
+ public void testWildcardAndFileMetadata() {
+
+ MockScanner mock = buildScanner(RowSetTestUtils.projectList(
+ ScanTestUtils.FILE_NAME_COL,
+ SchemaPath.DYNAMIC_STAR,
+ ScanTestUtils.partitionColName(0)));
+
+ // Verify empty batch.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("filename", MinorType.VARCHAR)
+ .addArray("columns", MinorType.VARCHAR)
+ .addNullable("dir0", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .build();
+
+ assertNotNull(mock.scanner.output());
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
+ }
+
+ // Create a batch of data.
+
+ mock.reader.startBatch();
+ mock.loader.writer()
+ .addRow(new Object[] {new String[] {"fred", "flintstone"}})
+ .addRow(new Object[] {new String[] {"barney", "rubble"}});
+ mock. reader.endBatch();
+
+ // Verify
+
+ {
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("z.csv", new String[] {"fred", "flintstone"}, "x")
+ .addRow("z.csv", new String[] {"barney", "rubble"}, "x")
+ .build();
+
+ RowSetUtilities.verify(expected,
+ fixture.wrap(mock.scanner.output()));
+ }
+
+ mock.scanner.close();
}
private ScanSchemaOrchestrator buildScan(List<SchemaPath> cols) {
@@ -160,6 +274,7 @@ public class TestColumnsArray extends SubOperatorTest {
try {
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(tableSchema);
+ reader.defineSchema();
fail();
} catch (IllegalStateException e) {
// Expected
@@ -180,6 +295,7 @@ public class TestColumnsArray extends SubOperatorTest {
try {
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(tableSchema);
+ reader.defineSchema();
fail();
} catch (IllegalStateException e) {
// Expected
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index 4f32b56e0..e7a0188cd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -46,7 +47,7 @@ import org.apache.drill.test.SubOperatorTest;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -54,6 +55,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
* Test the columns-array specific behavior in the columns scan framework.
*/
+@Category(RowSetTests.class)
public class TestColumnsArrayFramework extends SubOperatorTest {
private static final Path MOCK_FILE_PATH = new Path("file:/w/x/y/z.csv");
@@ -101,7 +103,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
@Override
public boolean open(ColumnsSchemaNegotiator negotiator) {
this.negotiator = negotiator;
- negotiator.setTableSchema(schema);
+ negotiator.setTableSchema(schema, true);
negotiator.build();
return true;
}
@@ -115,6 +117,11 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
public void close() { }
}
+ /**
+ * Test including a column other than "columns". Occurs when
+ * using implicit columns.
+ */
+
@Test
public void testNonColumnsProjection() {
@@ -143,6 +150,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
scanFixture.close();
}
+ /**
+ * Test projecting just the `columns` column.
+ */
+
@Test
public void testColumnsProjection() {
@@ -171,6 +182,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest {
scanFixture.close();
}
+ /**
+ * Test including a specific index of `columns` such as
+ * `columns`[1].
+ */
@Test
public void testColumnsIndexProjection() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index e2c8a21d3..d1e91a283 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
@@ -35,9 +36,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
import org.apache.drill.test.SubOperatorTest;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+@Category(RowSetTests.class)
public class TestColumnsArrayParser extends SubOperatorTest {
/**
@@ -255,5 +257,45 @@ public class TestColumnsArrayParser extends SubOperatorTest {
assertEquals(FileMetadataColumn.ID, scanProj.columns().get(2).nodeType());
}
- // TODO: Test Columns element projection
+ /**
+ * If a query is of the form:
+ * <pre><code>
+ * select * from dfs.`multilevel/csv` where columns[1] < 1000
+ * </code><pre>
+ * Then the projection list passed to the scan operator
+ * includes both the wildcard and the `columns` array.
+ * We can ignore one of them.
+ */
+
+ @Test
+ public void testWildcardAndColumns() {
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(
+ SchemaPath.DYNAMIC_STAR,
+ ColumnsArrayManager.COLUMNS_COL),
+ ScanTestUtils.parsers(new ColumnsArrayParser(true)));
+
+ assertFalse(scanProj.projectAll());
+ assertEquals(2, scanProj.requestedCols().size());
+
+ assertEquals(1, scanProj.columns().size());
+ assertEquals(ColumnsArrayManager.COLUMNS_COL, scanProj.columns().get(0).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumnsArrayColumn.ID, scanProj.columns().get(0).nodeType());
+ }
+
+ @Test
+ public void testColumnsAsMap() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList("columns.x"),
+ ScanTestUtils.parsers(new ColumnsArrayParser(true)));
+ fail();
+ }
+ catch (UserException e) {
+ // Expected
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java
deleted file mode 100644
index 330a66115..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn;
-import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader;
-import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
-import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-public class TestConstantColumnLoader extends SubOperatorTest {
-
- private static class DummyColumn implements ConstantColumnSpec {
-
- private String name;
- private MaterializedField schema;
- private String value;
-
- public DummyColumn(String name, MajorType type, String value) {
- this.name = name;
- this.schema = MaterializedField.create(name, type);
- this.value = value;
- }
-
- @Override
- public String name() { return name; }
-
- @Override
- public MaterializedField schema() { return schema; }
-
- @Override
- public String value() { return value; }
- }
-
- /**
- * Test the static column loader using one column of each type.
- * The null column is of type int, but the associated value is of
- * type string. This is a bit odd, but works out because we detect that
- * the string value is null and call setNull on the writer, and avoid
- * using the actual data.
- */
-
- @Test
- public void testConstantColumnLoader() {
-
- MajorType aType = MajorType.newBuilder()
- .setMinorType(MinorType.VARCHAR)
- .setMode(DataMode.REQUIRED)
- .build();
- MajorType bType = MajorType.newBuilder()
- .setMinorType(MinorType.VARCHAR)
- .setMode(DataMode.OPTIONAL)
- .build();
-
- List<ConstantColumnSpec> defns = new ArrayList<>();
- defns.add(
- new DummyColumn("a", aType, "a-value" ));
- defns.add(
- new DummyColumn("b", bType, "b-value" ));
-
- ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
- ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
-
- // Create a batch
-
- staticLoader.load(2);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("a", aType)
- .add("b", bType)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("a-value", "b-value")
- .addRow("a-value", "b-value")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
- staticLoader.close();
- }
-
- @Test
- public void testFileMetadata() {
-
- FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w"));
- List<ConstantColumnSpec> defns = new ArrayList<>();
- FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn(
- ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX);
- FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL,
- iDefn, fileInfo, null, 0);
- defns.add(iCol);
-
- String partColName = ScanTestUtils.partitionColName(1);
- PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0);
- defns.add(pCol);
-
- ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
- ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
-
- // Create a batch
-
- staticLoader.load(2);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR)
- .addNullable(partColName, MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("csv", "y")
- .addRow("csv", "y")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
- staticLoader.close();
- }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
index 08aeed53f..a6de5e6f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
@@ -33,9 +34,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
import org.apache.drill.test.SubOperatorTest;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+@Category(RowSetTests.class)
public class TestFileMetadataColumnParser extends SubOperatorTest {
@Test
@@ -135,8 +137,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
assertEquals(PartitionColumn.ID, scanProj.columns().get(0).nodeType());
}
+ /**
+ * Test wildcard expansion.
+ */
+
@Test
- public void testWildcard() {
+ public void testRevisedWildcard() {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
@@ -153,15 +159,45 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
}
/**
+ * Legacy (prior version) wildcard expansion always expands partition
+ * columns.
+ */
+
+ @Test
+ public void testLegacyWildcard() {
+ Path filePath = new Path("hdfs:///w/x/y/z.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ true, // Put partitions at end
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePath));
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ Lists.newArrayList(metadataManager.projectionParser()));
+
+ List<ColumnProjection> cols = scanProj.columns();
+ assertEquals(3, cols.size());
+ assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+ assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+ assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+ }
+
+ /**
* Combine wildcard and file metadata columms. The wildcard expands
* table columns but not metadata columns.
*/
@Test
- public void testWildcardAndFileMetadata() {
+ public void testLegacyWildcardAndFileMetadata() {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ false, // Put partitions at end
new Path("hdfs:///w"),
Lists.newArrayList(filePath));
@@ -173,10 +209,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Lists.newArrayList(metadataManager.projectionParser()));
List<ColumnProjection> cols = scanProj.columns();
- assertEquals(3, cols.size());
+ assertEquals(5, cols.size());
assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
assertEquals(FileMetadataColumn.ID, cols.get(1).nodeType());
assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(4).nodeType());
}
/**
@@ -185,10 +223,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
*/
@Test
- public void testWildcardAndFileMetadataMixed() {
+ public void testLegacyWildcardAndFileMetadataMixed() {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ false, // Put partitions at end
new Path("hdfs:///w"),
Lists.newArrayList(filePath));
@@ -200,18 +240,25 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
Lists.newArrayList(metadataManager.projectionParser()));
List<ColumnProjection> cols = scanProj.columns();
- assertEquals(3, cols.size());
+ assertEquals(5, cols.size());
assertEquals(FileMetadataColumn.ID, cols.get(0).nodeType());
assertEquals(UnresolvedColumn.WILDCARD, cols.get(1).nodeType());
assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(4).nodeType());
}
/**
- * Include both a wildcard and a partition column.
+ * Include both a wildcard and a partition column. The wildcard, in
+ * legacy mode, will create partition columns for any partitions not
+ * mentioned in the project list.
+ * <p>
+ * Tests proposed functionality: included only requested partition
+ * columns.
*/
@Test
- public void testWildcardAndPartition() {
+ public void testRevisedWildcardAndPartition() {
Path filePath = new Path("hdfs:///w/x/y/z.csv");
FileMetadataManager metadataManager = new FileMetadataManager(
fixture.getOptionManager(),
@@ -229,6 +276,113 @@ public class TestFileMetadataColumnParser extends SubOperatorTest {
assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
}
+ @Test
+ public void testLegacyWildcardAndPartition() {
+ Path filePath = new Path("hdfs:///w/x/y/z.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ true, // Put partitions at end
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePath));
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+ ScanTestUtils.partitionColName(8)),
+ Lists.newArrayList(metadataManager.projectionParser()));
+
+ List<ColumnProjection> cols = scanProj.columns();
+ assertEquals(4, cols.size());
+ assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+ assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+ assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+ assertEquals(8, ((PartitionColumn) cols.get(3)).partition());
+ }
+
+ @Test
+ public void testPreferredPartitionExpansion() {
+ Path filePath = new Path("hdfs:///w/x/y/z.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ false, // Put partitions at end
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePath));
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+ ScanTestUtils.partitionColName(8)),
+ Lists.newArrayList(metadataManager.projectionParser()));
+
+ List<ColumnProjection> cols = scanProj.columns();
+ assertEquals(4, cols.size());
+ assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+ assertEquals(8, ((PartitionColumn) cols.get(1)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+ assertEquals(0, ((PartitionColumn) cols.get(2)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(3).nodeType());
+ assertEquals(1, ((PartitionColumn) cols.get(3)).partition());
+ }
+
+ /**
+ * Test a case like:<br>
+ * <code>SELECT *, dir1 FROM ...</code><br>
+ * The projection list includes "dir1". The wildcard will
+ * fill in "dir0".
+ */
+
+ @Test
+ public void testLegacyWildcardAndPartitionWithOverlap() {
+ Path filePath = new Path("hdfs:///w/x/y/z.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ true, // Put partitions at end
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePath));
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+ ScanTestUtils.partitionColName(1)),
+ Lists.newArrayList(metadataManager.projectionParser()));
+
+ List<ColumnProjection> cols = scanProj.columns();
+ assertEquals(3, cols.size());
+ assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+ assertEquals(0, ((PartitionColumn) cols.get(1)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+ assertEquals(1, ((PartitionColumn) cols.get(2)).partition());
+ }
+
+ @Test
+ public void testPreferedWildcardExpansionWithOverlap() {
+ Path filePath = new Path("hdfs:///w/x/y/z.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ true, // Use legacy wildcard expansion
+ false, // Put partitions at end
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePath));
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR,
+ ScanTestUtils.partitionColName(1)),
+ Lists.newArrayList(metadataManager.projectionParser()));
+
+ List<ColumnProjection> cols = scanProj.columns();
+ assertEquals(3, cols.size());
+ assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType());
+ assertEquals(PartitionColumn.ID, cols.get(1).nodeType());
+ assertEquals(1, ((PartitionColumn) cols.get(1)).partition());
+ assertEquals(PartitionColumn.ID, cols.get(2).nodeType());
+ assertEquals(0, ((PartitionColumn) cols.get(2)).partition());
+ }
+
/**
* Verify that names that look like metadata columns, but appear
* to be maps or arrays, are not interpreted as metadata. That is,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
index 9161932d4..314bc2a13 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
@@ -46,9 +47,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.SubOperatorTest;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+@Category(RowSetTests.class)
public class TestFileMetadataProjection extends SubOperatorTest {
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index a10e7665a..2fdf3e637 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -32,7 +33,7 @@ import org.apache.drill.exec.physical.impl.scan.TestScanOperatorExec.AbstractSca
import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework;
import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderCreator;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -50,6 +51,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* Tests the file metadata extensions to the file operator framework.
@@ -57,6 +59,7 @@ import org.junit.Test;
* verified the underlying mechanisms.
*/
+@Category(RowSetTests.class)
public class TestFileScanFramework extends SubOperatorTest {
private static final String MOCK_FILE_NAME = "foo.csv";
@@ -117,7 +120,7 @@ public class TestFileScanFramework extends SubOperatorTest {
}
}
- public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderCreator {
+ public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderFactory {
protected final List<MockFileReader> readers = new ArrayList<>();
protected Iterator<MockFileReader> readerIter;
@@ -252,7 +255,7 @@ public class TestFileScanFramework extends SubOperatorTest {
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
@@ -486,7 +489,7 @@ public class TestFileScanFramework extends SubOperatorTest {
.add("b", MinorType.INT)
.resumeSchema()
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java
deleted file mode 100644
index a41305221..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertSame;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
-import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.Test;
-
-public class TestNullColumnLoader extends SubOperatorTest {
-
- private ResolvedNullColumn makeNullCol(String name, MajorType nullType) {
-
- // For this test, we don't need the projection, so just
- // set it to null.
-
- return new ResolvedNullColumn(name, nullType, null, 0);
- }
-
- private ResolvedNullColumn makeNullCol(String name) {
- return makeNullCol(name, null);
- }
-
- /**
- * Test the simplest case: default null type, nothing in the vector
- * cache. Specify no column type, the special NULL type, or a
- * predefined type. Output types should be set accordingly.
- */
-
- @Test
- public void testBasics() {
-
- List<ResolvedNullColumn> defns = new ArrayList<>();
- defns.add(makeNullCol("unspecified", null));
- defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL)));
- defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR)));
- defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR)));
- defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR)));
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false);
-
- // Create a batch
-
- VectorContainer output = staticLoader.load(2);
-
- // Verify values and types
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
- .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
- .addNullable("specifiedOpt", MinorType.VARCHAR)
- .addNullable("specifiedReq", MinorType.VARCHAR)
- .addArray("specifiedArray", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(null, null, null, null, new String[] {})
- .addRow(null, null, null, null, new String[] {})
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(output));
- staticLoader.close();
- }
-
- @Test
- public void testCustomNullType() {
-
- List<ResolvedNullColumn> defns = new ArrayList<>();
- defns.add(makeNullCol("unspecified", null));
- defns.add(makeNullCol("nullType", MajorType.newBuilder()
- .setMinorType(MinorType.NULL)
- .setMode(DataMode.OPTIONAL)
- .build()));
- defns.add(makeNullCol("nullTypeReq", MajorType.newBuilder()
- .setMinorType(MinorType.NULL)
- .setMode(DataMode.REQUIRED)
- .build()));
-
- // Null type array does not make sense, so is not tested.
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- MajorType nullType = MajorType.newBuilder()
- .setMinorType(MinorType.VARCHAR)
- .setMode(DataMode.OPTIONAL)
- .build();
- NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
-
- // Create a batch
-
- VectorContainer output = staticLoader.load(2);
-
- // Verify values and types
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("unspecified", nullType)
- .add("nullType", nullType)
- .add("nullTypeReq", nullType)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(null, null, null)
- .addRow(null, null, null)
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(output));
- staticLoader.close();
- }
-
- @Test
- public void testCachedTypesMapToNullable() {
-
- List<ResolvedNullColumn> defns = new ArrayList<>();
- defns.add(makeNullCol("req"));
- defns.add(makeNullCol("opt"));
- defns.add(makeNullCol("rep"));
- defns.add(makeNullCol("unk"));
-
- // Populate the cache with a column of each mode.
-
- ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
- cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
- ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
- ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
-
- // Use nullable Varchar for unknown null columns.
-
- MajorType nullType = MajorType.newBuilder()
- .setMinorType(MinorType.VARCHAR)
- .setMode(DataMode.OPTIONAL)
- .build();
- NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
-
- // Create a batch
-
- VectorContainer output = staticLoader.load(2);
-
- // Verify vectors are reused
-
- assertSame(opt, output.getValueVector(1).getValueVector());
- assertSame(rep, output.getValueVector(2).getValueVector());
-
- // Verify values and types
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .addNullable("req", MinorType.FLOAT8)
- .addNullable("opt", MinorType.FLOAT8)
- .addArray("rep", MinorType.FLOAT8)
- .addNullable("unk", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(null, null, new int[] { }, null)
- .addRow(null, null, new int[] { }, null)
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(output));
- staticLoader.close();
- }
-
- @Test
- public void testCachedTypesAllowRequired() {
-
- List<ResolvedNullColumn> defns = new ArrayList<>();
- defns.add(makeNullCol("req"));
- defns.add(makeNullCol("opt"));
- defns.add(makeNullCol("rep"));
- defns.add(makeNullCol("unk"));
-
- // Populate the cache with a column of each mode.
-
- ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
- cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
- ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
- ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
-
- // Use nullable Varchar for unknown null columns.
-
- MajorType nullType = MajorType.newBuilder()
- .setMinorType(MinorType.VARCHAR)
- .setMode(DataMode.OPTIONAL)
- .build();
- NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true);
-
- // Create a batch
-
- VectorContainer output = staticLoader.load(2);
-
- // Verify vectors are reused
-
- assertSame(opt, output.getValueVector(1).getValueVector());
- assertSame(rep, output.getValueVector(2).getValueVector());
-
- // Verify values and types
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("req", MinorType.FLOAT8)
- .addNullable("opt", MinorType.FLOAT8)
- .addArray("rep", MinorType.FLOAT8)
- .addNullable("unk", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(0.0, null, new int[] { }, null)
- .addRow(0.0, null, new int[] { }, null)
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(output));
- staticLoader.close();
- }
-
- @Test
- public void testNullColumnBuilder() {
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
-
- builder.add("unspecified");
- builder.add("nullType", Types.optional(MinorType.NULL));
- builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR));
- builder.add("specifiedReq", Types.required(MinorType.VARCHAR));
- builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR));
- builder.build(cache);
-
- // Create a batch
-
- builder.load(2);
-
- // Verify values and types
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
- .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
- .addNullable("specifiedOpt", MinorType.VARCHAR)
- .addNullable("specifiedReq", MinorType.VARCHAR)
- .addArray("specifiedArray", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(null, null, null, null, new String[] {})
- .addRow(null, null, null, null, new String[] {})
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(builder.output()));
- builder.close();
- }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java
deleted file mode 100644
index 079455789..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
-import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.Test;
-
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
-import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
-import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
-
-
-/**
- * Test the row batch merger by merging two batches. Tests both the
- * "direct" and "exchange" cases. Direct means that the output container
- * contains the source vector directly: they are the same vectors.
- * Exchange means we have two vectors, but we swap the underlying
- * Drillbufs to effectively shift data from source to destination
- * vector.
- */
-
-public class TestRowBatchMerger extends SubOperatorTest {
-
- public static class RowSetSource implements VectorSource {
-
- private SingleRowSet rowSet;
-
- public RowSetSource(SingleRowSet rowSet) {
- this.rowSet = rowSet;
- }
-
- public RowSet rowSet() { return rowSet; }
-
- public void clear() {
- rowSet.clear();
- }
-
- @Override
- public ValueVector vector(int index) {
- return rowSet.container().getValueVector(index).getValueVector();
- }
- }
-
- private RowSetSource makeFirst() {
- BatchSchema firstSchema = new SchemaBuilder()
- .add("d", MinorType.VARCHAR)
- .add("a", MinorType.INT)
- .build();
- return new RowSetSource(
- fixture.rowSetBuilder(firstSchema)
- .addRow("barney", 10)
- .addRow("wilma", 20)
- .build());
- }
-
- private RowSetSource makeSecond() {
- BatchSchema secondSchema = new SchemaBuilder()
- .add("b", MinorType.INT)
- .add("c", MinorType.VARCHAR)
- .build();
- return new RowSetSource(
- fixture.rowSetBuilder(secondSchema)
- .addRow(1, "foo.csv")
- .addRow(2, "foo.csv")
- .build());
- }
-
- public static class TestProjection extends ResolvedColumn {
-
- public TestProjection(VectorSource source, int sourceIndex) {
- super(source, sourceIndex);
- }
-
- @Override
- public String name() { return null; }
-
- @Override
- public int nodeType() { return -1; }
-
- @Override
- public MaterializedField schema() { return null; }
- }
-
- @Test
- public void testSimpleFlat() {
-
- // Create the first batch
-
- RowSetSource first = makeFirst();
-
- // Create the second batch
-
- RowSetSource second = makeSecond();
-
- ResolvedRow resolvedTuple = new ResolvedRow(null);
- resolvedTuple.add(new TestProjection(first, 1));
- resolvedTuple.add(new TestProjection(second, 0));
- resolvedTuple.add(new TestProjection(second, 1));
- resolvedTuple.add(new TestProjection(first, 0));
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(null, output);
- output.setRecordCount(first.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.INT)
- .add("c", MinorType.VARCHAR)
- .add("d", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(10, 1, "foo.csv", "barney")
- .addRow(20, 2, "foo.csv", "wilma")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- }
-
- @Test
- public void testImplicitFlat() {
-
- // Create the first batch
-
- RowSetSource first = makeFirst();
-
- // Create the second batch
-
- RowSetSource second = makeSecond();
-
- ResolvedRow resolvedTuple = new ResolvedRow(null);
- resolvedTuple.add(new TestProjection(resolvedTuple, 1));
- resolvedTuple.add(new TestProjection(second, 0));
- resolvedTuple.add(new TestProjection(second, 1));
- resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(first.rowSet().container(), output);
- output.setRecordCount(first.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.INT)
- .add("c", MinorType.VARCHAR)
- .add("d", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(10, 1, "foo.csv", "barney")
- .addRow(20, 2, "foo.csv", "wilma")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- }
-
- @Test
- public void testFlatWithNulls() {
-
- // Create the first batch
-
- RowSetSource first = makeFirst();
-
- // Create null columns
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
-
- ResolvedRow resolvedTuple = new ResolvedRow(builder);
- resolvedTuple.add(new TestProjection(resolvedTuple, 1));
- resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
- resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
- resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
- // Build the null values
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- builder.build(cache);
- builder.load(first.rowSet().rowCount());
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(first.rowSet().container(), output);
- output.setRecordCount(first.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("null1", MinorType.INT)
- .addNullable("null2", MinorType.VARCHAR)
- .add("d", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(10, null, null, "barney")
- .addRow(20, null, null, "wilma")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- builder.close();
- }
-
- /**
- * Test the ability to create maps from whole cloth if requested in
- * the projection list, and the map is not available from the data
- * source.
- */
-
- @Test
- public void testNullMaps() {
-
- // Create the first batch
-
- RowSetSource first = makeFirst();
-
- // Create null columns
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
- resolvedTuple.add(new TestProjection(resolvedTuple, 1));
-
- ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
- ResolvedTuple nullMap = nullMapCol.members();
- nullMap.add(nullMap.nullBuilder().add("null1"));
- nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
-
- ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
- ResolvedTuple nullMap2 = nullMapCol2.members();
- nullMap2.add(nullMap2.nullBuilder().add("null3"));
- nullMap.add(nullMapCol2);
-
- resolvedTuple.add(nullMapCol);
- resolvedTuple.add(new TestProjection(resolvedTuple, 0));
-
- // Build the null values
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- resolvedTuple.buildNulls(cache);
-
- // LoadNulls
-
- resolvedTuple.loadNulls(first.rowSet().rowCount());
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(first.rowSet().container(), output);
- resolvedTuple.setRowCount(first.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addMap("map1")
- .addNullable("null1", MinorType.INT)
- .addNullable("null2", MinorType.VARCHAR)
- .addMap("map2")
- .addNullable("null3", MinorType.INT)
- .resumeMap()
- .resumeSchema()
- .add("d", MinorType.VARCHAR)
- .build();
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(10, mapValue(null, null, singleMap(null)), "barney")
- .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- resolvedTuple.close();
- }
-
- /**
- * Test that the merger mechanism can rewrite a map to include
- * projected null columns.
- */
-
- @Test
- public void testMapRevision() {
-
- // Create the first batch
-
- BatchSchema inputSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .addMap("a")
- .add("c", MinorType.INT)
- .resumeSchema()
- .build();
- RowSetSource input = new RowSetSource(
- fixture.rowSetBuilder(inputSchema)
- .addRow("barney", singleMap(10))
- .addRow("wilma", singleMap(20))
- .build());
-
- // Create mappings
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
- resolvedTuple.add(new TestProjection(resolvedTuple, 0));
- ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
- inputSchema.getColumn(1), 1);
- resolvedTuple.add(mapCol);
- ResolvedTuple map = mapCol.members();
- map.add(new TestProjection(map, 0));
- map.add(map.nullBuilder().add("null1"));
-
- // Build the null values
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- resolvedTuple.buildNulls(cache);
-
- // LoadNulls
-
- resolvedTuple.loadNulls(input.rowSet().rowCount());
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(input.rowSet().container(), output);
- output.setRecordCount(input.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .addMap("a")
- .add("c", MinorType.INT)
- .addNullable("null1", MinorType.INT)
- .resumeSchema()
- .build();
- RowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("barney", mapValue(10, null))
- .addRow("wilma", mapValue(20, null))
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- }
-
- /**
- * Test that the merger mechanism can rewrite a map array to include
- * projected null columns.
- */
-
- @Test
- public void testMapArrayRevision() {
-
- // Create the first batch
-
- BatchSchema inputSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .addMapArray("a")
- .add("c", MinorType.INT)
- .resumeSchema()
- .build();
- RowSetSource input = new RowSetSource(
- fixture.rowSetBuilder(inputSchema)
- .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
- .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
- .build());
-
- // Create mappings
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow resolvedTuple = new ResolvedRow(builder);
-
- resolvedTuple.add(new TestProjection(resolvedTuple, 0));
- ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
- inputSchema.getColumn(1), 1);
- resolvedTuple.add(mapCol);
- ResolvedTuple map = mapCol.members();
- map.add(new TestProjection(map, 0));
- map.add(map.nullBuilder().add("null1"));
-
- // Build the null values
-
- ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- resolvedTuple.buildNulls(cache);
-
- // LoadNulls
-
- resolvedTuple.loadNulls(input.rowSet().rowCount());
-
- // Do the merge
-
- VectorContainer output = new VectorContainer(fixture.allocator());
- resolvedTuple.project(input.rowSet().container(), output);
- output.setRecordCount(input.rowSet().rowCount());
- RowSet result = fixture.wrap(output);
-
- // Verify
-
- BatchSchema expectedSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .addMapArray("a")
- .add("c", MinorType.INT)
- .addNullable("null1", MinorType.INT)
- .resumeSchema()
- .build();
- RowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("barney", mapArray(
- mapValue(10, null), mapValue(11, null), mapValue(12, null)))
- .addRow("wilma", mapArray(
- mapValue(20, null), mapValue(21, null)))
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(result);
- }
-
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
index 912a7172e..03587cb08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.scan;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractSubScan;
@@ -34,6 +35,7 @@ import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import io.netty.buffer.DrillBuf;
@@ -42,6 +44,7 @@ import io.netty.buffer.DrillBuf;
* set follows the same semantics as the original set.
*/
+@Category(RowSetTests.class)
public class TestScanBatchWriters extends SubOperatorTest {
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java
deleted file mode 100644
index 558f761c9..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
-import org.apache.drill.exec.record.metadata.ProjectionType;
-import org.apache.drill.test.SubOperatorTest;
-import org.junit.Test;
-
-/**
- * Test the level of projection done at the level of the scan as a whole;
- * before knowledge of table "implicit" columns or the specific table schema.
- */
-
-public class TestScanLevelProjection extends SubOperatorTest {
-
- /**
- * Basic test: select a set of columns (a, b, c) when the
- * data source has an early schema of (a, c, d). (a, c) are
- * projected, (d) is null.
- */
-
- @Test
- public void testBasics() {
-
- // Simulate SELECT a, b, c ...
- // Build the projection plan and verify
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a", "b", "c"),
- ScanTestUtils.parsers());
- assertFalse(scanProj.projectAll());
- assertFalse(scanProj.projectNone());
-
- assertEquals(3, scanProj.requestedCols().size());
- assertEquals("a", scanProj.requestedCols().get(0).rootName());
- assertEquals("b", scanProj.requestedCols().get(1).rootName());
- assertEquals("c", scanProj.requestedCols().get(2).rootName());
-
- assertEquals(3, scanProj.columns().size());
- assertEquals("a", scanProj.columns().get(0).name());
- assertEquals("b", scanProj.columns().get(1).name());
- assertEquals("c", scanProj.columns().get(2).name());
-
- // Verify column type
-
- assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
- }
-
- @Test
- public void testMap() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
- ScanTestUtils.parsers());
- assertFalse(scanProj.projectAll());
- assertFalse(scanProj.projectNone());
-
- assertEquals(3, scanProj.columns().size());
- assertEquals("a", scanProj.columns().get(0).name());
- assertEquals("b", scanProj.columns().get(1).name());
- assertEquals("c", scanProj.columns().get(2).name());
-
- // Verify column type
-
- assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
-
- // Map structure
-
- RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
- assertTrue(a.isTuple());
- assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
- assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
- assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z"));
-
- RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
- assertTrue(c.isSimple());
- }
-
- @Test
- public void testArray() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a[1]", "a[3]"),
- ScanTestUtils.parsers());
- assertFalse(scanProj.projectAll());
- assertFalse(scanProj.projectNone());
-
- assertEquals(1, scanProj.columns().size());
- assertEquals("a", scanProj.columns().get(0).name());
-
- // Verify column type
-
- assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
-
- // Map structure
-
- RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
- assertTrue(a.isArray());
- assertFalse(a.hasIndex(0));
- assertTrue(a.hasIndex(1));
- assertFalse(a.hasIndex(2));
- assertTrue(a.hasIndex(3));
- }
-
- /**
- * Simulate a SELECT * query by passing "**" (Drill's internal version
- * of the wildcard) as a column name.
- */
-
- @Test
- public void testWildcard() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- assertTrue(scanProj.projectAll());
- assertFalse(scanProj.projectNone());
- assertEquals(1, scanProj.requestedCols().size());
- assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
-
- assertEquals(1, scanProj.columns().size());
- assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
-
- // Verify bindings
-
- assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
-
- // Verify column type
-
- assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
- }
-
- /**
- * Test an empty projection which occurs in a
- * SELECT COUNT(*) query.
- */
-
- @Test
- public void testEmptyProjection() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList(),
- ScanTestUtils.parsers());
-
- assertFalse(scanProj.projectAll());
- assertTrue(scanProj.projectNone());
- assertEquals(0, scanProj.requestedCols().size());
- }
-
- /**
- * Can't include both a wildcard and a column name.
- */
-
- @Test
- public void testErrorWildcardAndColumns() {
- try {
- new ScanLevelProjection(
- RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
- ScanTestUtils.parsers());
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
- }
- }
-
- /**
- * Can't include both a column name and a wildcard.
- */
-
- @Test
- public void testErrorColumnAndWildcard() {
- try {
- new ScanLevelProjection(
- RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
- ScanTestUtils.parsers());
- fail();
- } catch (IllegalArgumentException e) {
- // Expected
- }
- }
-
- /**
- * Can't include a wildcard twice.
- * <p>
- * Note: Drill actually allows this, but the work should be done
- * in the project operator; scan should see at most one wildcard.
- */
-
- @Test
- public void testErrorTwoWildcards() {
- try {
- new ScanLevelProjection(
- RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
- ScanTestUtils.parsers());
- fail();
- } catch (UserException e) {
- // Expected
- }
- }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
index 33752a746..386849bb6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
@@ -54,6 +55,7 @@ import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* Test of the scan operator framework. Here the focus is on the
@@ -66,6 +68,7 @@ import org.junit.Test;
* appear elsewhere.
*/
+@Category(RowSetTests.class)
public class TestScanOperatorExec extends SubOperatorTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestScanOperatorExec.class);
@@ -187,7 +190,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
@@ -213,7 +216,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
.add("a", MinorType.VARCHAR)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
schemaNegotiator.build();
tableLoader = schemaNegotiator.build();
return true;
@@ -1367,7 +1370,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
@@ -1493,7 +1496,7 @@ public class TestScanOperatorExec extends SubOperatorTest {
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
- schemaNegotiator.setTableSchema(schema);
+ schemaNegotiator.setTableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
index d8c9a65ef..ef79b0ee6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java
@@ -22,12 +22,13 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
import org.apache.drill.exec.record.BatchSchema;
@@ -37,8 +38,9 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* Test the early-schema support of the scan orchestrator. "Early schema"
@@ -49,6 +51,7 @@ import org.junit.Test;
* that tests for lower-level components have already passed.
*/
+@Category(RowSetTests.class)
public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
/**
@@ -78,16 +81,17 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Schema provided, so an empty batch is available to
- // send downstream.
+ // Simulate a first reader in a scan that can provide an
+ // empty batch to define schema.
{
+ reader.defineSchema();
SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
.build();
assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
// Create a batch of data.
@@ -107,8 +111,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(2, "wilma")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
// Second batch.
@@ -128,8 +132,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(4, "betty")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
// Explicit reader close. (All other tests are lazy, they
@@ -167,16 +171,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch.
-
- {
- SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ // Don't bother with an empty batch here or in other tests.
+ // Simulates the second reader in a scan.
// Create a batch of data.
@@ -188,15 +184,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
- .addRow(1, "fred")
- .addRow(2, "wilma")
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
+ .addRow(1, "fred")
+ .addRow(2, "wilma")
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -228,20 +222,10 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch.
-
BatchSchema expectedSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.add("a", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -253,17 +237,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("fred", 1)
- .addRow("wilma", 2)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("fred", 1)
+ .addRow("wilma", 2)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
- scanner.close();
+ scanner.close();
}
/**
@@ -294,21 +276,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch
-
BatchSchema expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.addNullable("c", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -320,15 +292,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(1, "fred", null)
- .addRow(2, "wilma", null)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(1, "fred", null)
+ .addRow(2, "wilma", null)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -369,21 +339,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch
-
BatchSchema expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.addNullable("c", MinorType.VARCHAR)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -395,15 +355,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(1, "fred", null)
- .addRow(2, "wilma", null)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(1, "fred", null)
+ .addRow(2, "wilma", null)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -440,19 +398,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
assertFalse(loader.writer().column("b").schema().isProjected());
- // Verify empty batch.
-
BatchSchema expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -464,15 +412,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow(1)
- .addRow(2)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(1)
+ .addRow(2)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -516,16 +462,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
BatchSchema expectedSchema = new SchemaBuilder()
.build();
- {
- // Expect an empty schema
-
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -545,8 +481,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow()
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
// Fast path to fill in empty rows
@@ -592,18 +528,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
reader.makeTableLoader(tableSchema);
- // Schema provided, so an empty batch is available to
- // send downstream.
-
- {
- SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
-
// Create a batch of data. Because there are no columns, it does
// not make sense to ready any rows.
@@ -616,8 +540,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
scanner.close();
@@ -650,19 +574,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
reader.makeTableLoader(tableSchema);
- // Verify initial empty batch.
-
BatchSchema expectedSchema = new SchemaBuilder()
.addNullable("a", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data. Because there are no columns, it does
// not make sense to ready any rows.
@@ -672,16 +586,14 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .build();
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
+
/**
* The projection mechanism provides "type smoothing": null
* columns prefer the type of previously-seen non-null columns.
@@ -718,6 +630,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(table1Schema);
+ reader.defineSchema();
VectorContainer output = scanner.output();
tracker.trackSchema(output);
schemaVersion = tracker.schemaVersion();
@@ -737,6 +650,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.buildSchema();
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(table2Schema);
+ reader.defineSchema();
VectorContainer output = scanner.output();
tracker.trackSchema(output);
assertEquals(schemaVersion, tracker.schemaVersion());
@@ -756,6 +670,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.buildSchema();
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(table3Schema);
+ reader.defineSchema();
VectorContainer output = scanner.output();
tracker.trackSchema(output);
assertEquals(schemaVersion, tracker.schemaVersion());
@@ -776,6 +691,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.buildSchema();
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(table2Schema);
+ reader.defineSchema();
VectorContainer output = scanner.output();
tracker.trackSchema(output);
assertEquals(MinorType.BIGINT, output.getSchema().getColumn(0).getType().getMinorType());
@@ -843,8 +759,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(10, "fred")
.addRow(20, "wilma")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(projector.output()));
}
{
// ... FROM table 2
@@ -874,8 +790,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(30, null)
.addRow(40, null)
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(projector.output()));
}
{
// ... FROM table 3
@@ -899,8 +815,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(50, "dino")
.addRow(60, "barney")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(projector.output()));
}
projector.close();
@@ -926,9 +842,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ReaderSchemaOrchestrator reader = scanner.startReader();
ResultSetLoader loader = reader.makeTableLoader(schema1);
- tracker.trackSchema(scanner.output());
- schemaVersion = tracker.schemaVersion();
-
// Create a batch
reader.startBatch();
@@ -936,6 +849,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow("fred")
.addRow("wilma");
reader.endBatch();
+ tracker.trackSchema(scanner.output());
+ schemaVersion = tracker.schemaVersion();
// Verify
@@ -944,8 +859,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow("wilma")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.closeReader();
}
{
@@ -960,8 +875,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ReaderSchemaOrchestrator reader = scanner.startReader();
ResultSetLoader loader = reader.makeTableLoader(schema2);
- tracker.trackSchema(scanner.output());
- assertEquals(schemaVersion, tracker.schemaVersion());
// Create a batch
@@ -973,13 +886,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify, using persistent schema
+ tracker.trackSchema(scanner.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(schema1)
.addRow("barney")
.addRow("betty")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.closeReader();
}
{
@@ -994,9 +909,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
ReaderSchemaOrchestrator reader = scanner.startReader();
ResultSetLoader loader = reader.makeTableLoader(schema3);
- tracker.trackSchema(scanner.output());
- assertEquals(schemaVersion, tracker.schemaVersion());
-
// Create a batch
reader.startBatch();
@@ -1007,13 +919,16 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
// Verify, using persistent schema
+ tracker.trackSchema(scanner.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
+
SingleRowSet expected = fixture.rowSetBuilder(schema1)
.addRow("bam-bam")
.addRow("pebbles")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.closeReader();
}
@@ -1073,8 +988,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(10, "fred", 110L)
.addRow(20, "wilma", 110L)
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.closeReader();
}
@@ -1100,8 +1015,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(30, "bambam", 330L)
.addRow(40, "betty", 440L)
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
{
// ... FROM table 3
@@ -1125,8 +1040,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest {
.addRow(50, "dino", 550L)
.addRow(60, "barney", 660L)
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
scanner.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
index 0cf2cba31..84ffc4e32 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java
@@ -18,10 +18,12 @@
package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertFalse;
+
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSetLoader;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
@@ -32,6 +34,7 @@ import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
/**
* Test the late-schema support in the scan orchestrator. "Late schema" is the case
@@ -43,6 +46,7 @@ import org.junit.Test;
* that tests for lower-level components have already passed.
*/
+@Category(RowSetTests.class)
public class TestScanOrchestratorLateSchema extends SubOperatorTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
index e10055114..c7b52e2da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java
@@ -20,14 +20,15 @@ package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
import org.apache.drill.exec.record.BatchSchema;
@@ -35,10 +36,10 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
@@ -46,6 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
* with implicit file columns provided by the file metadata manager.
*/
+@Category(RowSetTests.class)
public class TestScanOrchestratorMetadata extends SubOperatorTest {
/**
@@ -104,8 +106,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
.addRow(2, "wilma", "/w/x/y/z.csv", "/w/x/y", "z.csv", "csv", "x", "y")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -146,19 +148,9 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch.
-
BatchSchema expectedSchema = new SchemaBuilder()
.addNullable("c", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -170,15 +162,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addSingleCol(null)
- .addSingleCol(null)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addSingleCol(null)
+ .addSingleCol(null)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -229,6 +219,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
// Verify empty batch.
+ reader.defineSchema();
BatchSchema expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
@@ -240,8 +231,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
.build();
assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
// Create a batch of data.
@@ -260,8 +251,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
.addRow(2, "wilma", "x", "csv")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
}
scanner.close();
@@ -302,22 +293,12 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
ResultSetLoader loader = reader.makeTableLoader(tableSchema);
- // Verify empty batch.
-
BatchSchema expectedSchema = new SchemaBuilder()
.addNullable("dir0", MinorType.VARCHAR)
.add("b", MinorType.VARCHAR)
.add("suffix", MinorType.VARCHAR)
.addNullable("c", MinorType.INT)
.build();
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .build();
-
- assertNotNull(scanner.output());
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
// Create a batch of data.
@@ -329,15 +310,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
// Verify
- {
- SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
- .addRow("x", "fred", "csv", null)
- .addRow("x", "wilma", "csv", null)
- .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("x", "fred", "csv", null)
+ .addRow("x", "wilma", "csv", null)
+ .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
- }
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.close();
}
@@ -403,8 +382,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
.addRow("x", "y", "a.csv", "fred")
.addRow("x", "y", "a.csv", "wilma")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
// Do explicit close (as in real code) to avoid an implicit
// close which will blow away the current file info...
@@ -431,8 +410,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest {
.addRow("x", null, "b.csv", "bambam")
.addRow("x", null, "b.csv", "betty")
.build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(scanner.output()));
+ RowSetUtilities.verify(expected,
+ fixture.wrap(scanner.output()));
scanner.closeReader();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java
deleted file mode 100644
index 021d7e381..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
-import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.SubOperatorTest;
-import org.junit.Test;
-
-public class TestSchemaLevelProjection extends SubOperatorTest {
-
- @Test
- public void testWildcard() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
- assertEquals(1, scanProj.columns().size());
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .addNullable("c", MinorType.INT)
- .addArray("d", MinorType.FLOAT8)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new WildcardSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(3, columns.size());
-
- assertEquals("a", columns.get(0).name());
- assertEquals(0, columns.get(0).sourceIndex());
- assertSame(rootTuple, columns.get(0).source());
- assertEquals("c", columns.get(1).name());
- assertEquals(1, columns.get(1).sourceIndex());
- assertSame(rootTuple, columns.get(1).source());
- assertEquals("d", columns.get(2).name());
- assertEquals(2, columns.get(2).sourceIndex());
- assertSame(rootTuple, columns.get(2).source());
- }
-
- /**
- * Test SELECT list with columns defined in a order and with
- * name case different than the early-schema table.
- */
-
- @Test
- public void testFullList() {
-
- // Simulate SELECT c, b, a ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("c", "b", "a"),
- ScanTestUtils.parsers());
- assertEquals(3, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (a, b, c)
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .add("B", MinorType.VARCHAR)
- .add("C", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(3, columns.size());
-
- assertEquals("c", columns.get(0).name());
- assertEquals(2, columns.get(0).sourceIndex());
- assertSame(rootTuple, columns.get(0).source());
-
- assertEquals("b", columns.get(1).name());
- assertEquals(1, columns.get(1).sourceIndex());
- assertSame(rootTuple, columns.get(1).source());
-
- assertEquals("a", columns.get(2).name());
- assertEquals(0, columns.get(2).sourceIndex());
- assertSame(rootTuple, columns.get(2).source());
- }
-
- /**
- * Test SELECT list with columns missing from the table schema.
- */
-
- @Test
- public void testMissing() {
-
- // Simulate SELECT c, v, b, w ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("c", "v", "b", "w"),
- ScanTestUtils.parsers());
- assertEquals(4, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (a, b, c)
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .add("B", MinorType.VARCHAR)
- .add("C", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(4, columns.size());
- VectorSource nullBuilder = rootTuple.nullBuilder();
-
- assertEquals("c", columns.get(0).name());
- assertEquals(2, columns.get(0).sourceIndex());
- assertSame(rootTuple, columns.get(0).source());
-
- assertEquals("v", columns.get(1).name());
- assertEquals(0, columns.get(1).sourceIndex());
- assertSame(nullBuilder, columns.get(1).source());
-
- assertEquals("b", columns.get(2).name());
- assertEquals(1, columns.get(2).sourceIndex());
- assertSame(rootTuple, columns.get(2).source());
-
- assertEquals("w", columns.get(3).name());
- assertEquals(1, columns.get(3).sourceIndex());
- assertSame(nullBuilder, columns.get(3).source());
- }
-
- @Test
- public void testSubset() {
-
- // Simulate SELECT c, a ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("c", "a"),
- ScanTestUtils.parsers());
- assertEquals(2, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (a, b, c)
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .add("B", MinorType.VARCHAR)
- .add("C", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(2, columns.size());
-
- assertEquals("c", columns.get(0).name());
- assertEquals(2, columns.get(0).sourceIndex());
- assertSame(rootTuple, columns.get(0).source());
-
- assertEquals("a", columns.get(1).name());
- assertEquals(0, columns.get(1).sourceIndex());
- assertSame(rootTuple, columns.get(1).source());
- }
-
- @Test
- public void testDisjoint() {
-
- // Simulate SELECT c, a ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("b"),
- ScanTestUtils.parsers());
- assertEquals(1, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (a)
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("A", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(1, columns.size());
- VectorSource nullBuilder = rootTuple.nullBuilder();
-
- assertEquals("b", columns.get(0).name());
- assertEquals(0, columns.get(0).sourceIndex());
- assertSame(nullBuilder, columns.get(0).source());
- }
-
- @Test
- public void testOmittedMap() {
-
- // Simulate SELECT a, b.c.d ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a", "b.c.d"),
- ScanTestUtils.parsers());
- assertEquals(2, scanProj.columns().size());
- {
- assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType());
- UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1));
- assertTrue(bCol.element().isTuple());
- }
-
- // Simulate a data source, with early schema, of (a)
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(2, columns.size());
-
- // Should have resolved a to a table column, b to a missing map.
-
- // A is projected
-
- ResolvedColumn aCol = columns.get(0);
- assertEquals("a", aCol.name());
- assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
-
- // B is not projected, is implicitly a map
-
- ResolvedColumn bCol = columns.get(1);
- assertEquals("b", bCol.name());
- assertEquals(ResolvedMapColumn.ID, bCol.nodeType());
-
- ResolvedMapColumn bMap = (ResolvedMapColumn) bCol;
- ResolvedTuple bMembers = bMap.members();
- assertNotNull(bMembers);
- assertEquals(1, bMembers.columns().size());
-
- // C is a map within b
-
- ResolvedColumn cCol = bMembers.columns().get(0);
- assertEquals(ResolvedMapColumn.ID, cCol.nodeType());
-
- ResolvedMapColumn cMap = (ResolvedMapColumn) cCol;
- ResolvedTuple cMembers = cMap.members();
- assertNotNull(cMembers);
- assertEquals(1, cMembers.columns().size());
-
- // D is an unknown column type (not a map)
-
- ResolvedColumn dCol = cMembers.columns().get(0);
- assertEquals(ResolvedNullColumn.ID, dCol.nodeType());
- }
-
- /**
- * Test of a map with missing columns.
- * table of (a{b, c}), project a.c, a.d, a.e.f
- */
-
- @Test
- public void testOmittedMapMembers() {
-
- // Simulate SELECT a.c, a.d, a.e.f ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
- ScanTestUtils.parsers());
- assertEquals(3, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (x, y, a{b, c})
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("x", MinorType.VARCHAR)
- .add("y", MinorType.INT)
- .addMap("a")
- .add("b", MinorType.BIGINT)
- .add("c", MinorType.FLOAT8)
- .resumeSchema()
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(3, columns.size());
-
- // Should have resolved a.b to a map column,
- // a.d to a missing nested map, and a.e.f to a missing
- // nested map member
-
- // X is projected
-
- ResolvedColumn xCol = columns.get(0);
- assertEquals("x", xCol.name());
- assertEquals(ResolvedTableColumn.ID, xCol.nodeType());
- assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source());
- assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex());
-
- // Y is projected
-
- ResolvedColumn yCol = columns.get(2);
- assertEquals("y", yCol.name());
- assertEquals(ResolvedTableColumn.ID, yCol.nodeType());
- assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source());
- assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex());
-
- // A is projected
-
- ResolvedColumn aCol = columns.get(1);
- assertEquals("a", aCol.name());
- assertEquals(ResolvedMapColumn.ID, aCol.nodeType());
-
- ResolvedMapColumn aMap = (ResolvedMapColumn) aCol;
- ResolvedTuple aMembers = aMap.members();
- assertFalse(aMembers.isSimpleProjection());
- assertNotNull(aMembers);
- assertEquals(3, aMembers.columns().size());
-
- // a.c is projected
-
- ResolvedColumn acCol = aMembers.columns().get(0);
- assertEquals("c", acCol.name());
- assertEquals(ResolvedTableColumn.ID, acCol.nodeType());
- assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex());
-
- // a.d is not in the table, is null
-
- ResolvedColumn adCol = aMembers.columns().get(1);
- assertEquals("d", adCol.name());
- assertEquals(ResolvedNullColumn.ID, adCol.nodeType());
-
- // a.e is not in the table, is implicitly a map
-
- ResolvedColumn aeCol = aMembers.columns().get(2);
- assertEquals("e", aeCol.name());
- assertEquals(ResolvedMapColumn.ID, aeCol.nodeType());
-
- ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol;
- ResolvedTuple aeMembers = aeMap.members();
- assertFalse(aeMembers.isSimpleProjection());
- assertNotNull(aeMembers);
- assertEquals(1, aeMembers.columns().size());
-
- // a.d.f is a null column
-
- ResolvedColumn aefCol = aeMembers.columns().get(0);
- assertEquals("f", aefCol.name());
- assertEquals(ResolvedNullColumn.ID, aefCol.nodeType());
- }
-
- /**
- * Simple map project. This is an internal case in which the
- * query asks for a set of columns inside a map, and the table
- * loader produces exactly that set. No special projection is
- * needed, the map is projected as a whole.
- */
-
- @Test
- public void testSimpleMapProject() {
-
- // Simulate SELECT a.b, a.c ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a.b", "a.c"),
- ScanTestUtils.parsers());
- assertEquals(1, scanProj.columns().size());
-
- // Simulate a data source, with early schema, of (a{b, c})
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .addMap("a")
- .add("b", MinorType.BIGINT)
- .add("c", MinorType.FLOAT8)
- .resumeSchema()
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(1, columns.size());
-
- // Should have resolved a.b to a map column,
- // a.d to a missing nested map, and a.e.f to a missing
- // nested map member
-
- // a is projected as a vector, not as a structured map
-
- ResolvedColumn aCol = columns.get(0);
- assertEquals("a", aCol.name());
- assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
- assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
- assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
- }
-
- /**
- * Project of a non-map as a map
- */
-
- @Test
- public void testMapMismatch() {
-
- // Simulate SELECT a.b ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a.b"),
- ScanTestUtils.parsers());
-
- // Simulate a data source, with early schema, of (a)
- // where a is not a map.
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- try {
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
- fail();
- } catch (UserException e) {
- // Expected
- }
- }
-
- /**
- * Test project of an array. At the scan level, we just verify
- * that the requested column is, indeed, an array.
- */
-
- @Test
- public void testArrayProject() {
-
- // Simulate SELECT a[0] ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a[0]"),
- ScanTestUtils.parsers());
-
- // Simulate a data source, with early schema, of (a)
- // where a is not an array.
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .addArray("a", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(1, columns.size());
-
- ResolvedColumn aCol = columns.get(0);
- assertEquals("a", aCol.name());
- assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
- assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
- assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
- }
-
- /**
- * Project of a non-array as an array
- */
-
- @Test
- public void testArrayMismatch() {
-
- // Simulate SELECT a[0] ...
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList("a[0]"),
- ScanTestUtils.parsers());
-
- // Simulate a data source, with early schema, of (a)
- // where a is not an array.
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.VARCHAR)
- .buildSchema();
-
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- try {
- new ExplicitSchemaProjection(
- scanProj, tableSchema, rootTuple,
- ScanTestUtils.resolvers());
- fail();
- } catch (UserException e) {
- // Expected
- }
- }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java
deleted file mode 100644
index a30321b0d..000000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
-import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
-import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
-import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
-import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother;
-import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
-import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection;
-import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
-import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
-import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
-import org.apache.drill.exec.record.metadata.SchemaBuilder;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-/**
- * Tests schema smoothing at the schema projection level.
- * This level handles reusing prior types when filling null
- * values. But, because no actual vectors are involved, it
- * does not handle the schema chosen for a table ahead of
- * time, only the schema as it is merged with prior schema to
- * detect missing columns.
- * <p>
- * Focuses on the <tt>SmoothingProjection</tt> class itself.
- * <p>
- * Note that, at present, schema smoothing does not work for entire
- * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
- * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
- * not currently know how to recreate the map. The same is true of
- * lists and unions. Handling such cases is complex and is probably
- * better handled via a system that allows the user to specify their
- * intent by providing a schema to apply to the two files.
- */
-
-public class TestSchemaSmoothing extends SubOperatorTest {
-
- /**
- * Sanity test for the simple, discrete case. The purpose of
- * discrete is just to run the basic lifecycle in a way that
- * is compatible with the schema-persistence version.
- */
-
- @Test
- public void testDiscrete() {
-
- // Set up the file metadata manager
-
- Path filePathA = new Path("hdfs:///w/x/y/a.csv");
- Path filePathB = new Path("hdfs:///w/x/y/b.csv");
- FileMetadataManager metadataManager = new FileMetadataManager(
- fixture.getOptionManager(),
- new Path("hdfs:///w"),
- Lists.newArrayList(filePathA, filePathB));
-
- // Set up the scan level projection
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
- ScanTestUtils.parsers(metadataManager.projectionParser()));
-
- {
- // Define a file a.csv
-
- metadataManager.startFile(filePathA);
-
- // Build the output schema from the (a, b) table schema
-
- TupleMetadata twoColSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR, 10)
- .buildSchema();
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, twoColSchema, rootTuple,
- ScanTestUtils.resolvers(metadataManager));
-
- // Verify the full output schema
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("filename", MinorType.VARCHAR)
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR, 10)
- .buildSchema();
-
- // Verify
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(3, columns.size());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
- assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
- assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
- }
- {
- // Define a file b.csv
-
- metadataManager.startFile(filePathB);
-
- // Build the output schema from the (a) table schema
-
- TupleMetadata oneColSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .buildSchema();
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new ExplicitSchemaProjection(
- scanProj, oneColSchema, rootTuple,
- ScanTestUtils.resolvers(metadataManager));
-
- // Verify the full output schema
- // Since this mode is "discrete", we don't remember the type
- // of the missing column. (Instead, it is filled in at the
- // vector level as part of vector persistence.) During projection, it is
- // marked with type NULL so that the null column builder will fill in
- // the proper type.
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .add("filename", MinorType.VARCHAR)
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.NULL)
- .buildSchema();
-
- // Verify
-
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals(3, columns.size());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
- assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value());
- assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
- assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType());
- }
- }
-
- /**
- * Low-level test of the smoothing projection, including the exceptions
- * it throws when things are not going its way.
- */
-
- @Test
- public void testSmoothingProjection() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- // Table 1: (a: nullable bigint, b)
-
- TupleMetadata schema1 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .add("c", MinorType.FLOAT8)
- .buildSchema();
- ResolvedRow priorSchema;
- {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new WildcardSchemaProjection(
- scanProj, schema1, rootTuple,
- ScanTestUtils.resolvers());
- priorSchema = rootTuple;
- }
-
- // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
-
- TupleMetadata schema2 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .add("c", MinorType.FLOAT8)
- .buildSchema();
- try {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new SmoothingProjection(
- scanProj, schema2, priorSchema, rootTuple,
- ScanTestUtils.resolvers());
- assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
- priorSchema = rootTuple;
- } catch (IncompatibleSchemaException e) {
- fail();
- }
-
- // Table 3: (a, c, d), column added, must replan schema
-
- TupleMetadata schema3 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .add("c", MinorType.FLOAT8)
- .add("d", MinorType.INT)
- .buildSchema();
- try {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new SmoothingProjection(
- scanProj, schema3, priorSchema, rootTuple,
- ScanTestUtils.resolvers());
- fail();
- } catch (IncompatibleSchemaException e) {
- // Expected
- }
-
- // Table 4: (a: double), change type must replan schema
-
- TupleMetadata schema4 = new SchemaBuilder()
- .addNullable("a", MinorType.FLOAT8)
- .addNullable("b", MinorType.VARCHAR)
- .add("c", MinorType.FLOAT8)
- .buildSchema();
- try {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new SmoothingProjection(
- scanProj, schema4, priorSchema, rootTuple,
- ScanTestUtils.resolvers());
- fail();
- } catch (IncompatibleSchemaException e) {
- // Expected
- }
-
-// // Table 5: (a: not-nullable bigint): convert to nullable for consistency
-//
-// TupleMetadata schema5 = new SchemaBuilder()
-// .addNullable("a", MinorType.BIGINT)
-// .add("c", MinorType.FLOAT8)
-// .buildSchema();
-// try {
-// SmoothingProjection schemaProj = new SmoothingProjection(
-// scanProj, schema5, dummySource, dummySource,
-// new ArrayList<>(), priorSchema);
-// assertTrue(schema1.isEquivalent(ScanTestUtils.schema(schemaProj.columns())));
-// } catch (IncompatibleSchemaException e) {
-// fail();
-// }
-
- // Table 6: Drop a non-nullable column, must replan
-
- TupleMetadata schema6 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
- try {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- new SmoothingProjection(
- scanProj, schema6, priorSchema, rootTuple,
- ScanTestUtils.resolvers());
- fail();
- } catch (IncompatibleSchemaException e) {
- // Expected
- }
- }
-
- /**
- * Case in which the table schema is a superset of the prior
- * schema. Discard prior schema. Turn off auto expansion of
- * metadata for a simpler test.
- */
-
- @Test
- public void testSmaller() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- smoother.resolve(priorSchema, rootTuple);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
- }
- {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- smoother.resolve(tableSchema, rootTuple);
- assertEquals(2, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
- }
- }
-
- /**
- * Case in which the table schema and prior are disjoint
- * sets. Discard the prior schema.
- */
-
- @Test
- public void testDisjoint() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(2, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
- }
- }
-
- private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
- NullColumnBuilder builder = new NullColumnBuilder(null, false);
- ResolvedRow rootTuple = new ResolvedRow(builder);
- smoother.resolve(schema, rootTuple);
- return rootTuple;
- }
-
- /**
- * Column names match, but types differ. Discard the prior schema.
- */
-
- @Test
- public void testDifferentTypes() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(2, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
- }
- }
-
- /**
- * The prior and table schemas are identical. Preserve the prior
- * schema (though, the output is no different than if we discarded
- * the prior schema...)
- */
-
- @Test
- public void testSameSchemas() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
- assertTrue(actualSchema.isEquivalent(tableSchema));
- assertTrue(actualSchema.isEquivalent(priorSchema));
- }
- }
-
- /**
- * The prior and table schemas are identical, but the cases of names differ.
- * Preserve the case of the first schema.
- */
-
- @Test
- public void testDifferentCase() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("A", MinorType.INT)
- .add("B", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
- List<ResolvedColumn> columns = rootTuple.columns();
- assertEquals("a", columns.get(0).name());
- }
- }
-
- /**
- * Can't preserve the prior schema if it had required columns
- * where the new schema has no columns.
- */
-
- @Test
- public void testRequired() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(2, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
- }
- }
-
- /**
- * Preserve the prior schema if table is a subset and missing columns
- * are nullable or repeated.
- */
-
- @Test
- public void testMissingNullableColumns() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .addNullable("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .addArray("c", MinorType.BIGINT)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
- }
- }
-
- /**
- * Preserve the prior schema if table is a subset. Map the table
- * columns to the output using the prior schema ordering.
- */
-
- @Test
- public void testReordering() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- TupleMetadata priorSchema = new SchemaBuilder()
- .addNullable("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .addArray("c", MinorType.BIGINT)
- .buildSchema();
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("b", MinorType.VARCHAR)
- .addNullable("a", MinorType.INT)
- .buildSchema();
-
- {
- doResolve(smoother, priorSchema);
- }
- {
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
- }
- }
-
- /**
- * If using the legacy wildcard expansion, reuse schema if partition paths
- * are the same length.
- */
-
- @Test
- public void testSamePartitionLength() {
-
- // Set up the file metadata manager
-
- Path filePathA = new Path("hdfs:///w/x/y/a.csv");
- Path filePathB = new Path("hdfs:///w/x/y/b.csv");
- FileMetadataManager metadataManager = new FileMetadataManager(
- fixture.getOptionManager(),
- new Path("hdfs:///w"),
- Lists.newArrayList(filePathA, filePathB));
-
- // Set up the scan level projection
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- ScanTestUtils.projectAllWithMetadata(2),
- ScanTestUtils.parsers(metadataManager.projectionParser()));
-
- // Define the schema smoother
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers(metadataManager));
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
- {
- metadataManager.startFile(filePathA);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- {
- metadataManager.startFile(filePathB);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- }
-
- /**
- * If using the legacy wildcard expansion, reuse schema if the new partition path
- * is shorter than the previous. (Unneeded partitions will be set to null by the
- * scan projector.)
- */
-
- @Test
- public void testShorterPartitionLength() {
-
- // Set up the file metadata manager
-
- Path filePathA = new Path("hdfs:///w/x/y/a.csv");
- Path filePathB = new Path("hdfs:///w/x/b.csv");
- FileMetadataManager metadataManager = new FileMetadataManager(
- fixture.getOptionManager(),
- new Path("hdfs:///w"),
- Lists.newArrayList(filePathA, filePathB));
-
- // Set up the scan level projection
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- ScanTestUtils.projectAllWithMetadata(2),
- ScanTestUtils.parsers(metadataManager.projectionParser()));
-
- // Define the schema smoother
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers(metadataManager));
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
- {
- metadataManager.startFile(filePathA);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- {
- metadataManager.startFile(filePathB);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- }
-
- /**
- * If using the legacy wildcard expansion, we are able to use the same
- * schema even if the new partition path is longer than the previous.
- * Because all file names are provided up front.
- */
-
- @Test
- public void testLongerPartitionLength() {
-
- // Set up the file metadata manager
-
- Path filePathA = new Path("hdfs:///w/x/a.csv");
- Path filePathB = new Path("hdfs:///w/x/y/b.csv");
- FileMetadataManager metadataManager = new FileMetadataManager(
- fixture.getOptionManager(),
- new Path("hdfs:///w"),
- Lists.newArrayList(filePathA, filePathB));
-
- // Set up the scan level projection
-
- ScanLevelProjection scanProj = new ScanLevelProjection(
- ScanTestUtils.projectAllWithMetadata(2),
- ScanTestUtils.parsers(metadataManager.projectionParser()));
-
- // Define the schema smoother
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers(metadataManager));
-
- TupleMetadata tableSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .add("b", MinorType.VARCHAR)
- .buildSchema();
-
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
- {
- metadataManager.startFile(filePathA);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- {
- metadataManager.startFile(filePathB);
- ResolvedRow rootTuple = doResolve(smoother, tableSchema);
- assertEquals(1, smoother.schemaVersion());
- assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
- }
- }
-
- /**
- * Integrated test across multiple schemas at the batch level.
- */
-
- @Test
- public void testSmoothableSchemaBatches() {
- ScanLevelProjection scanProj = new ScanLevelProjection(
- RowSetTestUtils.projectAll(),
- ScanTestUtils.parsers());
-
- SchemaSmoother smoother = new SchemaSmoother(scanProj,
- ScanTestUtils.resolvers());
-
- // Table 1: (a: bigint, b)
-
- TupleMetadata schema1 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .add("c", MinorType.FLOAT8)
- .buildSchema();
- {
- ResolvedRow rootTuple = doResolve(smoother, schema1);
-
- // Just use the original schema.
-
- assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
- assertEquals(1, smoother.schemaVersion());
- }
-
- // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
-
- TupleMetadata schema2 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .add("c", MinorType.FLOAT8)
- .buildSchema();
- {
- ResolvedRow rootTuple = doResolve(smoother, schema2);
- assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
- assertEquals(1, smoother.schemaVersion());
- }
-
- // Table 3: (a, c, d), column added, must replan schema
-
- TupleMetadata schema3 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .add("c", MinorType.FLOAT8)
- .add("d", MinorType.INT)
- .buildSchema();
- {
- ResolvedRow rootTuple = doResolve(smoother, schema3);
- assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
- assertEquals(2, smoother.schemaVersion());
- }
-
- // Table 4: Drop a non-nullable column, must replan
-
- TupleMetadata schema4 = new SchemaBuilder()
- .addNullable("a", MinorType.BIGINT)
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
- {
- ResolvedRow rootTuple = doResolve(smoother, schema4);
- assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
- assertEquals(3, smoother.schemaVersion());
- }
-
- // Table 5: (a: double), change type must replan schema
-
- TupleMetadata schema5 = new SchemaBuilder()
- .addNullable("a", MinorType.FLOAT8)
- .addNullable("b", MinorType.VARCHAR)
- .buildSchema();
- {
- ResolvedRow rootTuple = doResolve(smoother, schema5);
- assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
- assertEquals(4, smoother.schemaVersion());
- }
-
-// // Table 6: (a: not-nullable bigint): convert to nullable for consistency
-//
-// TupleMetadata schema6 = new SchemaBuilder()
-// .add("a", MinorType.FLOAT8)
-// .add("b", MinorType.VARCHAR)
-// .buildSchema();
-// {
-// SchemaLevelProjection schemaProj = smoother.resolve(schema3, dummySource);
-// assertTrue(schema5.isEquivalent(ScanTestUtils.schema(schemaProj.columns())));
-// }
- }
-
- /**
- * A SELECT * query uses the schema of the table as the output schema.
- * This is trivial when the scanner has one table. But, if two or more
- * tables occur, then things get interesting. The first table sets the
- * schema. The second table then has:
- * <ul>
- * <li>The same schema, trivial case.</li>
- * <li>A subset of the first table. The type of the "missing" column
- * from the first table is used for a null column in the second table.</li>
- * <li>A superset or disjoint set of the first schema. This triggers a hard schema
- * change.</li>
- * </ul>
- * <p>
- * It is an open question whether previous columns should be preserved on
- * a hard reset. For now, the code implements, and this test verifies, that a
- * hard reset clears the "memory" of prior schemas.
- */
-
- @Test
- public void testWildcardSmoothing() {
- ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator());
- projector.enableSchemaSmoothing(true);
- projector.build(RowSetTestUtils.projectAll());
-
- TupleMetadata firstSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR, 10)
- .addNullable("c", MinorType.BIGINT)
- .buildSchema();
- TupleMetadata subsetSchema = new SchemaBuilder()
- .addNullable("b", MinorType.VARCHAR, 10)
- .add("a", MinorType.INT)
- .buildSchema();
- TupleMetadata disjointSchema = new SchemaBuilder()
- .add("a", MinorType.INT)
- .addNullable("b", MinorType.VARCHAR, 10)
- .add("d", MinorType.VARCHAR)
- .buildSchema();
-
- SchemaTracker tracker = new SchemaTracker();
- int schemaVersion;
- {
- // First table, establishes the baseline
- // ... FROM table 1
-
- ReaderSchemaOrchestrator reader = projector.startReader();
- ResultSetLoader loader = reader.makeTableLoader(firstSchema);
-
- reader.startBatch();
- loader.writer()
- .addRow(10, "fred", 110L)
- .addRow(20, "wilma", 110L);
- reader.endBatch();
-
- tracker.trackSchema(projector.output());
- schemaVersion = tracker.schemaVersion();
-
- SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
- .addRow(10, "fred", 110L)
- .addRow(20, "wilma", 110L)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
- }
- {
- // Second table, same schema, the trivial case
- // ... FROM table 2
-
- ReaderSchemaOrchestrator reader = projector.startReader();
- ResultSetLoader loader = reader.makeTableLoader(firstSchema);
-
- reader.startBatch();
- loader.writer()
- .addRow(70, "pebbles", 770L)
- .addRow(80, "hoppy", 880L);
- reader.endBatch();
-
- tracker.trackSchema(projector.output());
- assertEquals(schemaVersion, tracker.schemaVersion());
-
- SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
- .addRow(70, "pebbles", 770L)
- .addRow(80, "hoppy", 880L)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
- }
- {
- // Third table: subset schema of first two
- // ... FROM table 3
-
- ReaderSchemaOrchestrator reader = projector.startReader();
- ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
-
- reader.startBatch();
- loader.writer()
- .addRow("bambam", 30)
- .addRow("betty", 40);
- reader.endBatch();
-
- tracker.trackSchema(projector.output());
- assertEquals(schemaVersion, tracker.schemaVersion());
-
- SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
- .addRow(30, "bambam", null)
- .addRow(40, "betty", null)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
- }
- {
- // Fourth table: disjoint schema, cases a schema reset
- // ... FROM table 4
-
- ReaderSchemaOrchestrator reader = projector.startReader();
- ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
-
- reader.startBatch();
- loader.writer()
- .addRow(50, "dino", "supporting")
- .addRow(60, "barney", "main");
- reader.endBatch();
-
- tracker.trackSchema(projector.output());
- assertNotEquals(schemaVersion, tracker.schemaVersion());
-
- SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
- .addRow(50, "dino", "supporting")
- .addRow(60, "barney", "main")
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(fixture.wrap(projector.output()));
- }
-
- projector.close();
- }
-
- // TODO: Test schema smoothing with repeated
- // TODO: Test hard schema change
- // TODO: Typed null column tests (resurrect)
- // TODO: Test maps and arrays of maps
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
index b45374be2..086da96b4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
@@ -24,14 +24,21 @@ import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadata;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn;
+import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn;
import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -114,4 +121,42 @@ public class TestConstantColumnLoader extends SubOperatorTest {
.verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
staticLoader.close();
}
+
+ @Test
+ public void testFileMetadata() {
+
+ FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w"));
+ List<ConstantColumnSpec> defns = new ArrayList<>();
+ FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn(
+ ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX);
+ FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL,
+ iDefn, fileInfo, null, 0);
+ defns.add(iCol);
+
+ String partColName = ScanTestUtils.partitionColName(1);
+ PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0);
+ defns.add(pCol);
+
+ ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
+
+ // Create a batch
+
+ staticLoader.load(2);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR)
+ .addNullable(partColName, MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("csv", "y")
+ .addRow("csv", "y")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
+ staticLoader.close();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index 5b49ab3c7..f40e84787 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.project;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -27,6 +28,8 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.test.SubOperatorTest;
@@ -56,6 +59,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
final ScanLevelProjection scanProj = new ScanLevelProjection(
RowSetTestUtils.projectList("a", "b", "c"),
ScanTestUtils.parsers());
+
assertFalse(scanProj.projectAll());
assertFalse(scanProj.projectNone());
@@ -72,6 +76,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
// Verify column type
assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(3, outputProj.projections().size());
+ assertNotNull(outputProj.get("a"));
+ assertTrue(outputProj.get("a").isSimple());
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertEquals(3, readerProj.projections().size());
+ assertNotNull(readerProj.get("a"));
+ assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
+ assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
}
/**
@@ -85,6 +102,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
final ScanLevelProjection scanProj = new ScanLevelProjection(
RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
ScanTestUtils.parsers());
+
assertFalse(scanProj.projectAll());
assertFalse(scanProj.projectNone());
@@ -107,6 +125,20 @@ public class TestScanLevelProjection extends SubOperatorTest {
final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
assertTrue(c.isSimple());
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(3, outputProj.projections().size());
+ assertNotNull(outputProj.get("a"));
+ assertTrue(outputProj.get("a").isTuple());
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertEquals(3, readerProj.projections().size());
+ assertNotNull(readerProj.get("a"));
+ assertEquals(ProjectionType.TUPLE, readerProj.projectionType("a"));
+ assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
+ assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d"));
}
/**
@@ -119,6 +151,7 @@ public class TestScanLevelProjection extends SubOperatorTest {
final ScanLevelProjection scanProj = new ScanLevelProjection(
RowSetTestUtils.projectList("a[1]", "a[3]"),
ScanTestUtils.parsers());
+
assertFalse(scanProj.projectAll());
assertFalse(scanProj.projectNone());
@@ -137,6 +170,19 @@ public class TestScanLevelProjection extends SubOperatorTest {
assertTrue(a.hasIndex(1));
assertFalse(a.hasIndex(2));
assertTrue(a.hasIndex(3));
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(1, outputProj.projections().size());
+ assertNotNull(outputProj.get("a"));
+ assertTrue(outputProj.get("a").isArray());
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertEquals(1, readerProj.projections().size());
+ assertNotNull(readerProj.get("a"));
+ assertEquals(ProjectionType.ARRAY, readerProj.projectionType("a"));
+ assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("c"));
}
/**
@@ -165,6 +211,17 @@ public class TestScanLevelProjection extends SubOperatorTest {
// Verify column type
assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(1, outputProj.projections().size());
+ assertNotNull(outputProj.get("**"));
+ assertTrue(outputProj.get("**").isWildcard());
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertTrue(readerProj instanceof ImpliedTupleRequest);
+ assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
}
/**
@@ -181,38 +238,62 @@ public class TestScanLevelProjection extends SubOperatorTest {
assertFalse(scanProj.projectAll());
assertTrue(scanProj.projectNone());
assertEquals(0, scanProj.requestedCols().size());
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(0, outputProj.projections().size());
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertTrue(readerProj instanceof ImpliedTupleRequest);
+ assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("a"));
}
/**
- * Can't include both a wildcard and a column name.
+ * Can include both a wildcard and a column name. The Project
+ * operator will fill in the column, the scan framework just ignores
+ * the extra column.
*/
@Test
- public void testErrorWildcardAndColumns() {
- try {
- new ScanLevelProjection(
+ public void testWildcardAndColumns() {
+ ScanLevelProjection scanProj = new ScanLevelProjection(
RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
ScanTestUtils.parsers());
- fail();
- } catch (final IllegalArgumentException e) {
- // Expected
- }
+
+ assertTrue(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+ assertEquals(2, scanProj.requestedCols().size());
+ assertEquals(1, scanProj.columns().size());
+
+ // Verify tuple projection
+
+ RequestedTuple outputProj = scanProj.rootProjection();
+ assertEquals(2, outputProj.projections().size());
+ assertNotNull(outputProj.get("**"));
+ assertTrue(outputProj.get("**").isWildcard());
+ assertNotNull(outputProj.get("a"));
+
+ RequestedTuple readerProj = scanProj.readerProjection();
+ assertTrue(readerProj instanceof ImpliedTupleRequest);
+ assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a"));
+ assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c"));
}
/**
- * Can't include both a column name and a wildcard.
+ * Test a column name and a wildcard.
*/
@Test
- public void testErrorColumnAndWildcard() {
- try {
- new ScanLevelProjection(
+ public void testColumnAndWildcard() {
+ ScanLevelProjection scanProj = new ScanLevelProjection(
RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
ScanTestUtils.parsers());
- fail();
- } catch (final IllegalArgumentException e) {
- // Expected
- }
+
+ assertTrue(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+ assertEquals(2, scanProj.requestedCols().size());
+ assertEquals(1, scanProj.columns().size());
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index a21b1e472..8adc0372a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -28,8 +28,9 @@ import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
+import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
@@ -38,8 +39,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
* Tests schema smoothing at the schema projection level.
@@ -63,7 +66,7 @@ import org.junit.experimental.categories.Category;
* to a fundamental limitation in Drill:
* <ul>
* <li>Drill cannot predict the future: each file (or batch)
- * may have a different schema.</ul>
+ * may have a different schema.</li>
* <li>Drill does not know about these differences until they
* occur.</li>
* <li>The scan operator is obligated to return the same schema
@@ -85,6 +88,105 @@ import org.junit.experimental.categories.Category;
public class TestSchemaSmoothing extends SubOperatorTest {
/**
+ * Sanity test for the simple, discrete case. The purpose of
+ * discrete is just to run the basic lifecycle in a way that
+ * is compatible with the schema-persistence version.
+ */
+
+ @Test
+ public void testDiscrete() {
+
+ // Set up the file metadata manager
+
+ Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+ Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePathA, filePathB));
+
+ // Set up the scan level projection
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
+ ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+ {
+ // Define a file a.csv
+
+ metadataManager.startFile(filePathA);
+
+ // Build the output schema from the (a, b) table schema
+
+ TupleMetadata twoColSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .buildSchema();
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, twoColSchema, rootTuple,
+ ScanTestUtils.resolvers(metadataManager));
+
+ // Verify the full output schema
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("filename", MinorType.VARCHAR)
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .buildSchema();
+
+ // Verify
+
+ List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
+ assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
+ assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
+ }
+ {
+ // Define a file b.csv
+
+ metadataManager.startFile(filePathB);
+
+ // Build the output schema from the (a) table schema
+
+ TupleMetadata oneColSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, oneColSchema, rootTuple,
+ ScanTestUtils.resolvers(metadataManager));
+
+ // Verify the full output schema
+ // Since this mode is "discrete", we don't remember the type
+ // of the missing column. (Instead, it is filled in at the
+ // vector level as part of vector persistence.) During projection, it is
+ // marked with type NULL so that the null column builder will fill in
+ // the proper type.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("filename", MinorType.VARCHAR)
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.NULL)
+ .buildSchema();
+
+ // Verify
+
+ List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
+ assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value());
+ assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType());
+ assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType());
+ }
+ }
+
+ /**
* Low-level test of the smoothing projection, including the exceptions
* it throws when things are not going its way.
*/
@@ -463,6 +565,150 @@ public class TestSchemaSmoothing extends SubOperatorTest {
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
}
}
+
+ /**
+ * If using the legacy wildcard expansion, reuse schema if partition paths
+ * are the same length.
+ */
+
+ @Test
+ public void testSamePartitionLength() {
+
+ // Set up the file metadata manager
+
+ Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+ Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePathA, filePathB));
+
+ // Set up the scan level projection
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+ // Define the schema smoother
+
+ SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers(metadataManager));
+
+ TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ {
+ metadataManager.startFile(filePathA);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ {
+ metadataManager.startFile(filePathB);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ }
+
+ /**
+ * If using the legacy wildcard expansion, reuse schema if the new partition path
+ * is shorter than the previous. (Unneeded partitions will be set to null by the
+ * scan projector.)
+ */
+
+ @Test
+ public void testShorterPartitionLength() {
+
+ // Set up the file metadata manager
+
+ Path filePathA = new Path("hdfs:///w/x/y/a.csv");
+ Path filePathB = new Path("hdfs:///w/x/b.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePathA, filePathB));
+
+ // Set up the scan level projection
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+ // Define the schema smoother
+
+ SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers(metadataManager));
+
+ TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ {
+ metadataManager.startFile(filePathA);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ {
+ metadataManager.startFile(filePathB);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ }
+
+ /**
+ * If using the legacy wildcard expansion, we are able to use the same
+ * schema even if the new partition path is longer than the previous.
+ * Because all file names are provided up front.
+ */
+
+ @Test
+ public void testLongerPartitionLength() {
+
+ // Set up the file metadata manager
+
+ Path filePathA = new Path("hdfs:///w/x/a.csv");
+ Path filePathB = new Path("hdfs:///w/x/y/b.csv");
+ FileMetadataManager metadataManager = new FileMetadataManager(
+ fixture.getOptionManager(),
+ new Path("hdfs:///w"),
+ Lists.newArrayList(filePathA, filePathB));
+
+ // Set up the scan level projection
+
+ ScanLevelProjection scanProj = new ScanLevelProjection(
+ ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.parsers(metadataManager.projectionParser()));
+
+ // Define the schema smoother
+
+ SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers(metadataManager));
+
+ TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ {
+ metadataManager.startFile(filePathA);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ {
+ metadataManager.startFile(filePathB);
+ ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
+ }
+ }
+
/**
* Integrated test across multiple schemas at the batch level.
*/