diff options
author | Paul Rogers <progers@cloudera.com> | 2019-03-03 17:55:22 -0800 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2019-03-05 16:29:42 +0200 |
commit | 7e3b45967dbb97da18ba49a2fa6a67a48e33b092 (patch) | |
tree | 3f46493d29571675fbe3d4e839cbb7f15311fa08 | |
parent | 2c3e2de2f94fd3f21a11c22b7944b94953e4f397 (diff) |
DRILL-7074: Scan framework fixes and enhancements
Roll-up of fixes an enhancements that emerged from the effort to host the CSV reader on the new framework.
closes #1676
42 files changed, 1726 insertions, 3229 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java index bbf12d4b8..b366d34d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java @@ -238,8 +238,11 @@ class ReaderState { /** * Prepare the schema for this reader. Called for the first reader within a - * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. If - * this is an early-schema reader, then the result set loader already has + * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. + * Asks the reader if it can provide a schema-only empty batch by calling + * the reader's <tt>defineSchema()</tt> method. If this is an early-schema + * reader, and it can provide a schema, then it should create an empty + * batch so that the the result set loader already has * the proper value vectors set up. If this is a late-schema reader, we must * read one batch to get the schema, then set aside the data for the next * call to <tt>next()</tt>. @@ -255,9 +258,10 @@ class ReaderState { * <li>If if turned out that the file was * empty when trying to read the schema, <tt>open()</tt> returned false * and this method should never be called.</tt> - * <li>Otherwise, if a schema was available, then the schema is already - * set up in the result set loader as the result of schema negotiation, and - * this method simply returns <tt>true</tt>. + * <li>Otherwise, the reader does not know if it is the first reader or + * not. The call to <tt>defineSchema()</tt> notifies the reader that it + * is the first one. The reader should set up in the result set loader + * with an empty batch. * </ul> * <p> * Semantics for late-schema readers: @@ -280,14 +284,12 @@ class ReaderState { protected boolean buildSchema() { - VectorContainer container = reader.output(); - - if (container != null) { + if (reader.defineSchema()) { // Bind the output container to the output of the scan operator. // This returns an empty batch with the schema filled in. - scanOp.containerAccessor.setContainer(container); + scanOp.containerAccessor.setContainer(reader.output()); schemaVersion = reader.schemaVersion(); return true; } @@ -297,7 +299,8 @@ class ReaderState { if (! next()) { return false; } - container = reader.output(); + VectorContainer container = reader.output(); + schemaVersion = reader.schemaVersion(); if (container.getRecordCount() == 0) { return true; } @@ -374,8 +377,7 @@ class ReaderState { private boolean readBatch() { - // Try to read a batch. This may fail. If so, clean up the - // mess. + // Try to read a batch. This may fail. If so, clean up the mess. boolean more; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java index c0985b538..61de584f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java @@ -84,8 +84,6 @@ import org.apache.drill.exec.record.VectorContainer; public interface RowBatchReader { - enum Result { OK, LAST_BATCH, EOF } - /** * Name used when reporting errors. Can simply be the class name. * @@ -111,6 +109,22 @@ public interface RowBatchReader { boolean open(); /** + * Called for the first reader within a scan. Allows the reader to + * provide an empty batch with only the schema filled in. Readers that + * are "early schema" (know the schema up front) should return true + * and create an empty batch. Readers that are "late schema" should + * return false. In that case, the scan operator will ask the reader + * to load an actual data batch, and infer the schema from that batch. + * <p> + * This step is optional and is purely for performance. + * + * @return true if this reader can (and has) defined an empty batch + * to describe the schema, false otherwise + */ + + boolean defineSchema(); + + /** * Read the next batch. Reading continues until either EOF, * or until the mutator indicates that the batch is full. * The batch is considered valid if it is non-empty. Returning @@ -129,7 +143,7 @@ public interface RowBatchReader { * <tt>next()</tt> should be called again, <tt>false</tt> to indicate * that EOF was reached * - * @throws RutimeException (<tt>UserException</tt> preferred) if an + * @throws RuntimeException (<tt>UserException</tt> preferred) if an * error occurs that should fail the query. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java index 9e174146f..04b2c7eb3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java @@ -20,15 +20,23 @@ package org.apache.drill.exec.physical.impl.scan; import org.apache.drill.exec.ops.OperatorContext; /** - * Interface to the set of readers, and reader schema, that the - * scan operator manages. The reader factory creates and returns - * the readers to use for the scan, as determined by the specific - * physical plan. The reader factory also - * translates from the select list provided - * in the physical plan to the actual columns returned from the - * scan operator. The translation is reader-specific; this - * interface allows the scan operator to trigger various - * lifecycle events. + * Interface to the set of readers, and reader schema, that the scan operator + * manages. The reader factory creates and returns the readers to use for the + * scan, as determined by the specific physical plan. The reader factory also + * translates from the select list provided in the physical plan to the actual + * columns returned from the scan operator. The translation is reader-specific; + * this interface allows the scan operator to trigger various lifecycle events. + * <p> + * This interface decouples the scan implementation from the generic tasks + * needed to implement Drill's Volcano iterator protocol for operators, and + * Drill's schema and batch semantics. A scan implementation need only + * implement this interface to add plugin-specific scan behavior. + * <p> + * While this interface allows a wide variety of implementations, the intent is + * that most actual scanners will use the "managed" framework that handles the + * routine projection, vector management and other tasks that tend to be common + * across scanners. See {@link ScanSchemaOrchestrator} for the managed + * framework. */ public interface ScanOperatorEvents { @@ -46,11 +54,25 @@ public interface ScanOperatorEvents { void bind(OperatorContext context); + /** + * A scanner typically readers multiple data sources (such as files or + * file blocks.) A batch reader handles each read. This method returns + * the next reader in whatever sequence that this scan defines. + * <p> + * The preferred implementation is to create each batch reader in this + * call to minimize resource usage. Production queries may read + * thousands of files or blocks, so incremental reader creation can be + * far more efficient than creating readers at the start of the scan. + * + * @return a batch reader for one of the scan elements within the + * scan physical plan for this scan operator + */ + RowBatchReader nextReader(); /** * Called when the scan operator itself is closed. Indicates that no more - * readers are available (or will be opened). + * readers are available. */ void close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java index f6c72b120..966a03901 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java @@ -35,14 +35,24 @@ import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput; * expands to `columns`.</li> * <li>If the columns array appears, then no other table columns * can appear.</li> - * <li>If the columns array appears, then the wildcard cannot also - * appear, unless that wildcard expanded to be `columns` as - * described above.</li> + * <li>Both 'columns' and the wildcard can appear for queries such + * as:<code><pre> + * select * from dfs.`multilevel/csv` + * where columns[1] < 1000</pre> + * </code></li> * <li>The query can select specific elements such as `columns`[2]. * In this case, only array elements can appear, not the unindexed * `columns` column.</li> + * <li>If is possible for `columns` to appear twice. In this case, + * the project operator will make a copy.</li> * </ul> * <p> + * To handle these cases, the general rule is: allow any number + * of wildcard or `columns` appearances in the input projection, but + * collapse them all down to a single occurrence of `columns` in the + * output projection. (Upstream code will prevent `columns` from + * appearing twice in its non-indexed form.) + * <p> * It falls to this parser to detect a not-uncommon user error, a * query such as the following:<pre><code> * SELECT max(columns[1]) AS col1 @@ -83,7 +93,8 @@ public class ColumnsArrayParser implements ScanProjectionParser { @Override public boolean parse(RequestedColumn inCol) { if (requireColumnsArray && inCol.isWildcard()) { - expandWildcard(); + createColumnsCol( + new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL)); return true; } if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) { @@ -113,41 +124,24 @@ public class ColumnsArrayParser implements ScanProjectionParser { .build(logger); } } - - // Special `columns` array column. - - columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol); - builder.addTableColumn(columnsArrayCol); + createColumnsCol(inCol); return true; } - /** - * Query contained SELECT *, and we know that the reader supports only - * the `columns` array; go ahead and expand the wildcard to the only - * possible column. - */ + private void createColumnsCol(RequestedColumn inCol) { + + // Special `columns` array column. Allow multiple, but + // project only one. - private void expandWildcard() { if (columnsArrayCol != null) { - throw UserException - .validationError() - .message("Cannot select columns[] and `*` together") - .build(logger); + return; } - columnsArrayCol = new UnresolvedColumnsArrayColumn( - new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL)); + columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol); builder.addTableColumn(columnsArrayCol); } @Override - public void validate() { - if (builder.hasWildcard() && columnsArrayCol != null) { - throw UserException - .validationError() - .message("Cannot select `columns` and `*` together") - .build(logger); - } - } + public void validate() { } @Override public void validateColumn(ColumnProjection col) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java index f9674dc2a..ae8502b52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.scan.file; +import java.util.HashSet; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,6 +40,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { private final FileMetadataManager metadataManager; private final Pattern partitionPattern; private ScanLevelProjection builder; + private final Set<Integer> referencedPartitions = new HashSet<>(); // Output @@ -64,6 +67,11 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { if (defn != null) { return buildMetadataColumn(defn, inCol); } + if (inCol.isWildcard()) { + buildWildcard(); + + // Don't consider this a match. + } return false; } @@ -80,11 +88,18 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // Partition column - builder.addMetadataColumn( - new PartitionColumn( - inCol.name(), - Integer.parseInt(m.group(1)))); - hasImplicitCols = true; + int partitionIndex = Integer.parseInt(m.group(1)); + if (! referencedPartitions.contains(partitionIndex)) { + builder.addMetadataColumn( + new PartitionColumn( + inCol.name(), + partitionIndex)); + + // Remember the partition for later wildcard expansion + + referencedPartitions.add(partitionIndex); + hasImplicitCols = true; + } return true; } @@ -107,8 +122,52 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { return true; } + private void buildWildcard() { + if (metadataManager.useLegacyWildcardExpansion && + metadataManager.useLegacyExpansionLocation) { + + // Star column: this is a SELECT * query. + + // Old-style wildcard handling inserts all partition columns in + // the scanner, removes them in Project. + // Fill in the file metadata columns. Can do here because the + // set is constant across all files. + + expandPartitions(); + } + } + @Override - public void validate() { } + public void validate() { + + // Expand partitions if using a wildcard appears, if using the + // feature to expand partitions for wildcards, and we want the + // partitions after data columns. + + if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion && + ! metadataManager.useLegacyExpansionLocation) { + expandPartitions(); + } + } + + private void expandPartitions() { + + // Legacy wildcard expansion: include the file partitions for this file. + // This is a disadvantage for a * query: files at different directory + // levels will have different numbers of columns. Would be better to + // return this data as an array at some point. + // Append this after the *, keeping the * for later expansion. + + for (int i = 0; i < metadataManager.partitionCount(); i++) { + if (referencedPartitions.contains(i)) { + continue; + } + builder.addMetadataColumn(new PartitionColumn( + metadataManager.partitionName(i), i)); + referencedPartitions.add(i); + } + hasImplicitCols = true; + } @Override public void validateColumn(ColumnProjection outCol) { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java index fe4332a34..ba49a9f54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java @@ -53,6 +53,31 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes protected final String partitionDesignator; protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); + + /** + * Indicates whether to expand partition columns when the query contains a wildcard. + * Supports queries such as the following:<code><pre> + * select * from dfs.`partitioned-dir` + * </pre><code> + * In which the output columns will be (columns, dir0) if the partitioned directory + * has one level of nesting. + * + * See {@link TestImplicitFileColumns#testImplicitColumns} + */ + protected final boolean useLegacyWildcardExpansion; + + /** + * In legacy mode, above, Drill expands partition columns whenever the + * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after + * data columns. This is actually a better position as it minimizes changes + * the row layout for files at different depths. Drill 1.12 moved them before + * data columns: at the location of the wildcard. + * <p> + * This flag, when set, uses the Drill 1.12 position. Later enhancements + * can unset this flag to go back to the future: use the preferred location + * after other columns. + */ + protected final boolean useLegacyExpansionLocation; private final FileMetadataColumnsParser parser; // Internal state @@ -84,7 +109,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes */ public FileMetadataManager(OptionSet optionManager, + boolean useLegacyWildcardExpansion, + boolean useLegacyExpansionLocation, Path rootDir, List<Path> files) { + this.useLegacyWildcardExpansion = useLegacyWildcardExpansion; + this.useLegacyExpansionLocation = useLegacyExpansionLocation; scanRootDir = rootDir; partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); @@ -117,6 +146,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes } } + public FileMetadataManager(OptionSet optionManager, + Path rootDir, List<Path> files) { + this(optionManager, false, false, rootDir, files); + } + private int computeMaxPartition(List<Path> files) { int maxLen = 0; for (Path filePath : files) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java index 609e9f05f..6ecf0cf05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java @@ -32,19 +32,46 @@ import org.apache.hadoop.mapred.FileSplit; /** * The file scan framework adds into the scan framework support for implicit - * file metadata columns. + * file metadata columns. The file scan framework brings together a number of + * components: + * <ul> + * <li>The projection list provided by the physical operator definition. This + * list identifies the set of "output" columns whih this framework is obliged + * to produce.</li> + * <li>The set of files and/or blocks to read.</li> + * <li>The file system configuration to use for working with the files + * or blocks.</li> + * <li>The factory class to create a reader for each of the files or blocks + * defined above. (Readers are created one-by-one as files are read.)</li> + * <li>Options as defined by the base class.</li> + * </ul> + * <p> + * @See {AbstractScanFramework} for details. */ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiator> { - public interface FileReaderCreator { + /** + * Creates a batch reader on demand. Unlike earlier versions of Drill, + * this framework creates readers one by one, when they are needed. + * Doing so avoids excessive resource demands that come from creating + * potentially thousands of readers up front. + * <p> + * The reader itself is unique to each file type. This interface + * provides a common interface that this framework can use to create the + * file-specific reader on demand. + */ + + public interface FileReaderFactory { ManagedReader<FileSchemaNegotiator> makeBatchReader( DrillFileSystem dfs, FileSplit split) throws ExecutionSetupException; } /** - * Implementation of the file-level schema negotiator. + * Implementation of the file-level schema negotiator. At present, no + * file-specific features exist. This class shows, however, where we would + * add such features. */ public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl @@ -55,12 +82,12 @@ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiato } } - private final FileReaderCreator readerCreator; + private final FileReaderFactory readerCreator; public FileScanFramework(List<SchemaPath> projection, List<? extends FileWork> files, Configuration fsConf, - FileReaderCreator readerCreator) { + FileReaderFactory readerCreator) { super(projection, files, fsConf); this.readerCreator = readerCreator; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java index 4a15ff787..bf132651b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java @@ -44,4 +44,17 @@ public abstract class MetadataColumn extends ResolvedColumn implements ConstantC public String name() { return schema.getName(); } public abstract MetadataColumn resolve(FileMetadata fileInfo, VectorSource source, int sourceIndex); + + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" schema=\"") + .append(schema.toString()) + .append(", value=") + .append(value) + .append("]") + .toString(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java index 54079013a..d285261c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java @@ -40,15 +40,91 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; * config options, and implements the matching "managed reader". All details * of setup, projection, and so on are handled by the framework and the components * that the framework builds upon. + * + * <h4>Inputs</h4> + * + * At this basic level, a scan framework requires just a few simple inputs: + * <ul> + * <li>The projection list provided by the physical operator definition. This + * list identifies the set of "output" columns whih this framework is obliged + * to produce.</li> + * <li>The operator context which provides access to a memory allocator and + * other plumbing items.</li> + * <li>A method to create a reader for each of the files or blocks + * defined above. (Readers are created one-by-one as files are read.)</li> + * <li>The data type to use for projected columns which the reader cannot + * provide. (Drill allows such columns and fills in null values: traditionally + * nullable Int, but customizable here.) + * <li>Various other options.</li> + * </ul> + * + * <h4>Orchestration</h4> + * + * The above is sufficient to drive the entire scan operator functionality. + * Projection is done generically and is the same for all files. Only the + * reader (created via the factory class) differs from one type of file to + * another. + * <p> + * The framework achieves the work described below= by composing a large + * set of detailed classes, each of which performs some specific task. This + * structure leaves the reader to simply infer schema and read data. + * <p> + * In particular, rather than do all the orchestration here (which would tie + * that logic to the scan operation), the detailed work is delegated to the + * {@link ScanSchemaOrchestrator} class, with this class as a "shim" between + * the the Scan events API and the schema orchestrator implementation. + * + * <h4>Reader Integration</h4> + * + * The details of how a file is structured, how a schema is inferred, how + * data is decoded: all that is encapsulated in the reader. The only real + * Interaction between the reader and the framework is: + * <ul> + * <li>The reader "negotiates" a schema with the framework. The framework + * knows the projection list from the query plan, knows something about + * data types (whether a column should be scalar, a map or an array), and + * knows about the schema already defined by prior readers. The reader knows + * what schema it can produce (if "early schema.") The schema negotiator + * class handles this task.</li> + * <li>The reader reads data from the file and populates value vectors a + * batch at a time. The framework creates the result set loader to use for + * this work. The schema negotiator returns that loader to the reader, which + * uses it during read. + * <p> + * It is important to note that the result set loader also defines a schema: + * the schema requested by the reader. If the reader wants to read three + * columns, a, b, and c, then that is the schema that the result set loader + * supports. This is true even if the query plan only wants column a, or + * wants columns c, a. The framework handles the projection task so the + * reader does not have to worry about it. Reading an unwanted column + * is low cost: the result set loader will have provided a "dummy" column + * writer that simply discards the value. This is just as fast as having the + * reader use if-statements or a table to determine which columns to save. + * <p> + * A reader may be "late schema", true "schema on read." In this case, the + * reader simply tells the result set loader to create a new column reader + * on the fly. The framework will work out if that new column is to be + * projected and will return either a real column writer (projected column) + * or a dummy column writer (unprojected column.)</li> + * <li>The reader then reads batches of data until all data is read. The + * result set loader signals when a batch is full; the reader should not + * worry about this detail itself.</li> + * <li>The reader then releases its resources.</li> + * </ul> */ public abstract class AbstractScanFramework<T extends SchemaNegotiator> implements ScanOperatorEvents { + // Inputs + protected final List<SchemaPath> projection; protected MajorType nullType; protected int maxBatchRowCount; protected int maxBatchByteCount; protected OperatorContext context; + + // Internal state + protected ScanSchemaOrchestrator scanOrchestrator; public AbstractScanFramework(List<SchemaPath> projection) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java index 631881265..dead9cb46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java @@ -65,9 +65,13 @@ public interface SchemaNegotiator { * columns during the read. * * @param schema the table schema if known at open time + * @param isComplete true if the schema is complete: if it can be used + * to define an empty schema-only batch for the first reader. Set to + * false if the schema is partial: if the reader must read rows to + * determine the full schema */ - void setTableSchema(TupleMetadata schema); + void setTableSchema(TupleMetadata schema, boolean isComplete); /** * Set the preferred batch size (which may be overridden by the diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java index 46f363dd5..0841049fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java @@ -53,6 +53,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { protected final AbstractScanFramework<?> basicFramework; private final ShimBatchReader<? extends SchemaNegotiator> shim; protected TupleMetadata tableSchema; + protected boolean isSchemaComplete; protected int batchSize = ValueVector.MAX_ROW_COUNT; public SchemaNegotiatorImpl(AbstractScanFramework<?> framework, ShimBatchReader<? extends SchemaNegotiator> shim) { @@ -66,8 +67,9 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { } @Override - public void setTableSchema(TupleMetadata schema) { + public void setTableSchema(TupleMetadata schema, boolean isComplete) { tableSchema = schema; + this.isSchemaComplete = schema != null && isComplete; } @Override @@ -97,4 +99,6 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { public boolean isProjectionEmpty() { return basicFramework.scanOrchestrator().isProjectNone(); } + + public boolean isSchemaComplete() { return isSchemaComplete; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java index 0dc3c5736..a97b32968 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.framework; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.impl.scan.RowBatchReader; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.record.VectorContainer; @@ -44,6 +44,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead protected final AbstractScanFramework<T> manager; protected final ManagedReader<T> reader; protected final ReaderSchemaOrchestrator readerOrchestrator; + protected SchemaNegotiatorImpl schemaNegotiator; protected ResultSetLoader tableLoader; /** @@ -96,10 +97,19 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead } @Override + public boolean defineSchema() { + if (schemaNegotiator.isSchemaComplete()) { + readerOrchestrator.defineSchema(); + return true; + } + return false; + } + + @Override public boolean next() { // The reader may report EOF, but the result set loader might - // have a lookhead row. + // have a lookahead row. if (eof && ! tableLoader.hasRows()) { return false; @@ -181,6 +191,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead } public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) { + this.schemaNegotiator = schemaNegotiator; readerOrchestrator.setBatchSize(schemaNegotiator.batchSize); tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema); return tableLoader; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java index efd881bb1..096b8447b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java @@ -29,7 +29,7 @@ * is an old version without a new column c, while file B includes the column. * And so on. * <p> - * The scan operator here works to ensure schema continuity as much as + * The scan operator works to ensure schema continuity as much as * possible, smoothing out "soft" schema changes that are simply artifacts of * reading a collection of files. Only "hard" changes (true changes) are * passed downstream. @@ -157,5 +157,29 @@ * output batch in the order specified by the original SELECT list (or table order, * if the original SELECT had a wildcard.) Fortunately, this is just involves * moving around pointers to vectors; no actual data is moved during projection. + * + * <h4>Class Structure</h4> + * + * Some of the key classes here include: + * <ul> + * <li>{@link RowBatchReader} an extremely simple interface for reading data. + * We would like many developers to create new plugins and readers. The simplified + * interface pushes all complexity into the scan framework, leaving the reader to + * just read.</li> + * <li>{@link ShimBatchReader} an implementation of the above that converts from + * the simplified API to add additional structure to work with the result set loader. + * (The base interface is agnostic about how rows are read.)</li> + * <li>{@link ScheamNegotiator} and interface that allows a batch reader to + * "negotiate" a schema with the scan framework. The scan framework knows the + * columns that are to be projected. The reader knows what columns it can offer. + * The schema negotiator works out how to combine the two. It expresses the result + * as a result set loader. Column writers are defined for all columns that the + * reader wants to read, but only the materialized (projected) columns have actual + * vectors behind them. The non-projected columns are "free-wheeling" "dummy" + * writers. + * </li> + * + * And, yes, sorry for the terminology. File "readers" read from files, but + * use column "writers" to write to value vectors. */ package org.apache.drill.exec.physical.impl.scan.framework; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java index 3e302a1e8..d7de30ad6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java @@ -36,9 +36,90 @@ * <p> * See {@link ScanOperatorExec} for details of the scan operator protocol * and components. + * + * <h4>Traditional Class Structure<h4> + * The original design was simple: but required each reader to handle many + * detailed tasks. + * <pre><code> + * +------------+ +-----------+ + * | Scan Batch | +---> | ScanBatch | + * | Creator | | +-----------+ + * +------------+ | | + * | | | + * v | | + * +------------+ | v + * | Format | ---+ +---------------+ + * | Plugin | -----> | Record Reader | + * +------------+ +---------------+ + * + * </code></pre> + * + * The scan batch creator is unique to each storage plugin and is created + * based on the physical operator configuration ("pop config"). The + * scan batch creator delegates to the format plugin to create both the + * scan batch (the scan operator) and the set of readers which the scan + * batch will manage. + * <p> + * The scan batch + * provides a <code>Mutator</code> that creates the vectors used by the + * record readers. Schema continuity comes from reusing the Mutator from one + * file/block to the next. + * <p> + * One characteristic of this system is that all the record readers are + * created up front. If we must read 1000 blocks, we'll create 1000 record + * readers. Developers must be very careful to only allocate resources when + * the reader is opened, and release resources when the reader is closed. + * Else, resource bloat becomes a large problem. + * + * <h4>Revised Class Structure</h4> + * + * The new design is more complex because it divides tasks up into separate + * classes. The class structure is larger, but each class is smaller, more + * focused and does just one task. + * <pre><code> + * +------------+ +---------------+ + * | Scan Batch | -------> | Format Plugin | + * | Creator | +---------------+ + * +------------+ / | \ + * / | \ + * +---------------------+ | \ +---------------+ + * | OperatorRecordBatch | | +---->| ScanFramework | + * +---------------------+ | | +---------------+ + * v | | + * +------------------+ | + * | ScanOperatorExec | | + * +------------------+ v + * | +--------------+ + * +----------> | Batch Reader | + * +--------------+ + * </code></pre> + * + * Here, the scan batch creator again delegates to the format plugin. The + * format plugin creates three objects: + * <ul> + * <li>The <code>OperatorRecordBatch</code>, which encapsulates the Volcano + * iterator protocol. It also holds onto the output batch. This allows the + * operator implementation to just focus on its specific job.</li> + * <li>The <code>ScanOperatorExec</code> is the operator implementation for + * the new result-set-loader based scan.</li> + * <li>The scan framework is specific to each kind of reader. It handles + * everything which is unique to that reader. Rather than inheriting from + * the scan itself, the framework follows the strategy pattern: it says how + * to do a scan for the target format.<li> + * </ul> + * + * The overall structure uses the "composition" pattern: what is combined + * into a small set of classes in the traditional model is broken out into + * focused classes in the revised model. + * <p> + * A key part of the scan strategy is the batch reader. ("Batch" because + * it reads an entire batch at a time, using the result set loader.) The + * framework creates batch readers one by one as needed. Resource bloat + * is less of an issue because only one batch reader instance exists at + * any time for each scan operator instance. * <p> - * See the "managed" package for a reusable framework for handling the - * details of batches, schema and so on. + * Each of the above is further broken down into additional classes to + * handle projection and so on. */ package org.apache.drill.exec.physical.impl.scan; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java index 41cc59582..c0bcfa3b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; */ public class ExplicitSchemaProjection extends SchemaLevelProjection { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class); public ExplicitSchemaProjection(ScanLevelProjection scanProj, TupleMetadata tableSchema, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java new file mode 100644 index 000000000..029b6a005 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Orchestrates projection tasks for a single reader within the set that the + * scan operator manages. Vectors are reused across readers, but via a vector + * cache. All other state is distinct between readers. + */ + +public class ReaderSchemaOrchestrator implements VectorSource { + + private final ScanSchemaOrchestrator scanOrchestrator; + private int readerBatchSize; + private ResultSetLoaderImpl tableLoader; + private int prevTableSchemaVersion = -1; + + /** + * Assembles the table, metadata and null columns into the final output + * batch to be sent downstream. The key goal of this class is to "smooth" + * schema changes in this output batch by absorbing trivial schema changes + * that occur across readers. + */ + + private ResolvedRow rootTuple; + private VectorContainer tableContainer; + + public ReaderSchemaOrchestrator(ScanSchemaOrchestrator scanSchemaOrchestrator) { + scanOrchestrator = scanSchemaOrchestrator; + readerBatchSize = scanOrchestrator.scanBatchRecordLimit; + } + + public void setBatchSize(int size) { + if (size > 0) { + readerBatchSize = Math.min(size, scanOrchestrator.scanBatchRecordLimit); + } + } + + public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) { + OptionBuilder options = new OptionBuilder(); + options.setRowCountLimit(readerBatchSize); + options.setVectorCache(scanOrchestrator.vectorCache); + options.setBatchSizeLimit(scanOrchestrator.scanBatchByteLimit); + + // Set up a selection list if available and is a subset of + // table columns. (Only needed for non-wildcard queries.) + // The projection list includes all candidate table columns + // whether or not they exist in the up-front schema. Handles + // the odd case where the reader claims a fixed schema, but + // adds a column later. + + if (! scanOrchestrator.scanProj.projectAll()) { + options.setProjectionSet(scanOrchestrator.scanProj.readerProjection()); + } + options.setSchema(tableSchema); + + // Create the table loader + + tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build()); + return tableLoader; + } + + public boolean hasSchema() { + return prevTableSchemaVersion >= 0; + } + + public void defineSchema() { + tableLoader.startEmptyBatch(); + endBatch(); + } + + public void startBatch() { + tableLoader.startBatch(); + } + + /** + * Build the final output batch by projecting columns from the three input sources + * to the output batch. First, build the metadata and/or null columns for the + * table row count. Then, merge the sources. + */ + + public void endBatch() { + + // Get the batch results in a container. + + tableContainer = tableLoader.harvest(); + + // If the schema changed, set up the final projection based on + // the new (or first) schema. + + if (prevTableSchemaVersion < tableLoader.schemaVersion()) { + reviseOutputProjection(); + } else { + + // Fill in the null and metadata columns. + + populateNonDataColumns(); + } + rootTuple.setRowCount(tableContainer.getRecordCount()); + } + + private void populateNonDataColumns() { + int rowCount = tableContainer.getRecordCount(); + scanOrchestrator.metadataManager.load(rowCount); + rootTuple.loadNulls(rowCount); + } + + /** + * Create the list of null columns by comparing the SELECT list against the + * columns available in the batch schema. Create null columns for those that + * are missing. This is done for the first batch, and any time the schema + * changes. (For early-schema, the projection occurs once as the schema is set + * up-front and does not change.) For a SELECT *, the null column check + * only need be done if null columns were created when mapping from a prior + * schema. + */ + + private void reviseOutputProjection() { + + // Do the table-schema level projection; the final matching + // of projected columns to available columns. + + TupleMetadata tableSchema = tableLoader.harvestSchema(); + if (scanOrchestrator.schemaSmoother != null) { + doSmoothedProjection(tableSchema); + } else if (scanOrchestrator.scanProj.hasWildcard()) { + doWildcardProjection(tableSchema); + } else { + doExplicitProjection(tableSchema); + } + + // Combine metadata, nulls and batch data to form the final + // output container. Columns are created by the metadata and null + // loaders only in response to a batch, so create the first batch. + + rootTuple.buildNulls(scanOrchestrator.vectorCache); + scanOrchestrator.metadataManager.define(); + populateNonDataColumns(); + rootTuple.project(tableContainer, scanOrchestrator.outputContainer); + prevTableSchemaVersion = tableLoader.schemaVersion(); + } + + private void doSmoothedProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns)); + scanOrchestrator.schemaSmoother.resolve(tableSchema, rootTuple); + } + + /** + * Query contains a wildcard. The schema-level projection includes + * all columns provided by the reader. + */ + + private void doWildcardProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow(null); + new WildcardSchemaProjection(scanOrchestrator.scanProj, + tableSchema, rootTuple, scanOrchestrator.schemaResolvers); + } + + /** + * Explicit projection: include only those columns actually + * requested by the query, which may mean filling in null + * columns for projected columns that don't actually exist + * in the table. + * + * @param tableSchema newly arrived schema + */ + + private void doExplicitProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns)); + new ExplicitSchemaProjection(scanOrchestrator.scanProj, + tableSchema, rootTuple, + scanOrchestrator.schemaResolvers); + } + + @Override + public ValueVector vector(int index) { + return tableContainer.getValueVector(index).getValueVector(); + } + + public void close() { + RuntimeException ex = null; + try { + if (tableLoader != null) { + tableLoader.close(); + tableLoader = null; + } + } + catch (RuntimeException e) { + ex = e; + } + try { + if (rootTuple != null) { + rootTuple.close(); + rootTuple = null; + } + } + catch (RuntimeException e) { + ex = ex == null ? e : ex; + } + scanOrchestrator.metadataManager.endFile(); + if (ex != null) { + throw ex; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java index 83d40a310..f90f722b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java @@ -100,8 +100,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl; public class ScanLevelProjection { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class); - /** * Interface for add-on parsers, avoids the need to create * a single, tightly-coupled parser for all types of columns. @@ -128,12 +126,26 @@ public class ScanLevelProjection { // Internal state + protected boolean includesWildcard; protected boolean sawWildcard; // Output protected List<ColumnProjection> outputCols = new ArrayList<>(); + + /** + * Projection definition for the scan a whole. Parsed form of the input + * projection list. + */ + protected RequestedTuple outputProjection; + + /** + * Projection definition passed to each reader. This is the set of + * columns that the reader is asked to provide. + */ + + protected RequestedTuple readerProjection; protected boolean hasWildcard; protected boolean emptyProjection = true; @@ -158,6 +170,18 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.bind(this); } + + // First pass: check if a wildcard exists. + + for (RequestedColumn inCol : outputProjection.projections()) { + if (inCol.isWildcard()) { + includesWildcard = true; + break; + } + } + + // Second pass: process remaining columns. + for (RequestedColumn inCol : outputProjection.projections()) { if (inCol.isWildcard()) { mapWildcard(inCol); @@ -169,6 +193,23 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.build(); } + + // Create the reader projection which includes either all columns + // (saw a wildcard) or just the unresolved columns (which excludes + // implicit columns.) + + List<RequestedColumn> outputProj; + if (hasWildcard()) { + outputProj = null; + } else { + outputProj = new ArrayList<>(); + for (ColumnProjection col : outputCols) { + if (col instanceof UnresolvedColumn) { + outputProj.add(((UnresolvedColumn) col).element()); + } + } + } + readerProjection = RequestedTupleImpl.build(outputProj); } /** @@ -181,6 +222,7 @@ public class ScanLevelProjection { // Wildcard column: this is a SELECT * query. + assert includesWildcard; if (sawWildcard) { throw new IllegalArgumentException("Duplicate * entry in project list"); } @@ -245,6 +287,15 @@ public class ScanLevelProjection { } } + // If the project list has a wildcard, and the column is not one recognized + // by the specialized parsers above, then just ignore it. It is likely a duplicate + // column name. In any event, it will be processed by the Project operator on + // top of this scan. + + if (includesWildcard) { + return; + } + // This is a desired table column. addTableColumn( @@ -281,15 +332,6 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.validateColumn(outCol); } - switch (outCol.nodeType()) { - case UnresolvedColumn.UNRESOLVED: - if (hasWildcard()) { - throw new IllegalArgumentException("Cannot select table columns and * together"); - } - break; - default: - break; - } } } @@ -333,6 +375,8 @@ public class ScanLevelProjection { public RequestedTuple rootProjection() { return outputProjection; } + public RequestedTuple readerProjection() { return readerProjection; } + @Override public String toString() { return new StringBuilder() diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java index fe78f5a75..a5d6ca2a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java @@ -23,15 +23,10 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; -import org.apache.drill.exec.physical.rowSet.ResultSetLoader; -import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; -import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; /** @@ -154,212 +149,6 @@ public class ScanSchemaOrchestrator { public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE; public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT; - /** - * Orchestrates projection tasks for a single reader with the set that the - * scan operator manages. Vectors are reused across readers, but via a vector - * cache. All other state is distinct between readers. - */ - - public class ReaderSchemaOrchestrator implements VectorSource { - - private int readerBatchSize; - private ResultSetLoaderImpl tableLoader; - private int prevTableSchemaVersion = -1; - - /** - * Assembles the table, metadata and null columns into the final output - * batch to be sent downstream. The key goal of this class is to "smooth" - * schema changes in this output batch by absorbing trivial schema changes - * that occur across readers. - */ - - private ResolvedRow rootTuple; - private VectorContainer tableContainer; - - public ReaderSchemaOrchestrator() { - readerBatchSize = scanBatchRecordLimit; - } - - public void setBatchSize(int size) { - if (size > 0) { - readerBatchSize = Math.min(size, scanBatchRecordLimit); - } - } - - public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) { - OptionBuilder options = new OptionBuilder(); - options.setRowCountLimit(readerBatchSize); - options.setVectorCache(vectorCache); - options.setBatchSizeLimit(scanBatchByteLimit); - - // Set up a selection list if available and is a subset of - // table columns. (Only needed for non-wildcard queries.) - // The projection list includes all candidate table columns - // whether or not they exist in the up-front schema. Handles - // the odd case where the reader claims a fixed schema, but - // adds a column later. - - if (! scanProj.projectAll()) { - options.setProjectionSet(scanProj.rootProjection()); - } - options.setSchema(tableSchema); - - // Create the table loader - - tableLoader = new ResultSetLoaderImpl(allocator, options.build()); - - // If a schema is given, create a zero-row batch to announce the - // schema downstream in the form of an empty batch. - - if (tableSchema != null) { - tableLoader.startEmptyBatch(); - endBatch(); - } - - return tableLoader; - } - - public boolean hasSchema() { - return prevTableSchemaVersion >= 0; - } - - public void startBatch() { - tableLoader.startBatch(); - } - - /** - * Build the final output batch by projecting columns from the three input sources - * to the output batch. First, build the metadata and/or null columns for the - * table row count. Then, merge the sources. - */ - - public void endBatch() { - - // Get the batch results in a container. - - tableContainer = tableLoader.harvest(); - - // If the schema changed, set up the final projection based on - // the new (or first) schema. - - if (prevTableSchemaVersion < tableLoader.schemaVersion()) { - reviseOutputProjection(); - } else { - - // Fill in the null and metadata columns. - - populateNonDataColumns(); - } - rootTuple.setRowCount(tableContainer.getRecordCount()); - } - - private void populateNonDataColumns() { - int rowCount = tableContainer.getRecordCount(); - metadataManager.load(rowCount); - rootTuple.loadNulls(rowCount); - } - - /** - * Create the list of null columns by comparing the SELECT list against the - * columns available in the batch schema. Create null columns for those that - * are missing. This is done for the first batch, and any time the schema - * changes. (For early-schema, the projection occurs once as the schema is set - * up-front and does not change.) For a SELECT *, the null column check - * only need be done if null columns were created when mapping from a prior - * schema. - */ - - private void reviseOutputProjection() { - - // Do the table-schema level projection; the final matching - // of projected columns to available columns. - - TupleMetadata tableSchema = tableLoader.harvestSchema(); - if (schemaSmoother != null) { - doSmoothedProjection(tableSchema); - } else if (scanProj.hasWildcard()) { - doWildcardProjection(tableSchema); - } else { - doExplicitProjection(tableSchema); - } - - // Combine metadata, nulls and batch data to form the final - // output container. Columns are created by the metadata and null - // loaders only in response to a batch, so create the first batch. - - rootTuple.buildNulls(vectorCache); - metadataManager.define(); - populateNonDataColumns(); - rootTuple.project(tableContainer, outputContainer); - prevTableSchemaVersion = tableLoader.schemaVersion(); - } - - private void doSmoothedProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow( - new NullColumnBuilder(nullType, allowRequiredNullColumns)); - schemaSmoother.resolve(tableSchema, rootTuple); - } - - /** - * Query contains a wildcard. The schema-level projection includes - * all columns provided by the reader. - */ - - private void doWildcardProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow(null); - new WildcardSchemaProjection(scanProj, - tableSchema, rootTuple, schemaResolvers); - } - - /** - * Explicit projection: include only those columns actually - * requested by the query, which may mean filling in null - * columns for projected columns that don't actually exist - * in the table. - * - * @param tableSchema newly arrived schema - */ - - private void doExplicitProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow( - new NullColumnBuilder(nullType, allowRequiredNullColumns)); - new ExplicitSchemaProjection(scanProj, - tableSchema, rootTuple, - schemaResolvers); - } - - @Override - public ValueVector vector(int index) { - return tableContainer.getValueVector(index).getValueVector(); - } - - public void close() { - RuntimeException ex = null; - try { - if (tableLoader != null) { - tableLoader.close(); - tableLoader = null; - } - } - catch (RuntimeException e) { - ex = e; - } - try { - if (rootTuple != null) { - rootTuple.close(); - rootTuple = null; - } - } - catch (RuntimeException e) { - ex = ex == null ? e : ex; - } - metadataManager.endFile(); - if (ex != null) { - throw ex; - } - } - } - // Configuration /** @@ -367,16 +156,16 @@ public class ScanSchemaOrchestrator { * not set, the null type is the Drill default. */ - private MajorType nullType; + MajorType nullType; /** * Creates the metadata (file and directory) columns, if needed. */ - private MetadataManager metadataManager; - private final BufferAllocator allocator; - private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; - private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; + MetadataManager metadataManager; + final BufferAllocator allocator; + int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; + int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; private final List<ScanProjectionParser> parsers = new ArrayList<>(); /** @@ -389,7 +178,7 @@ public class ScanSchemaOrchestrator { List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>(); private boolean useSchemaSmoothing; - private boolean allowRequiredNullColumns; + boolean allowRequiredNullColumns; // Internal state @@ -402,14 +191,14 @@ public class ScanSchemaOrchestrator { * vectors rather than vector instances, this cache can be deprecated. */ - private ResultVectorCacheImpl vectorCache; - private ScanLevelProjection scanProj; + ResultVectorCacheImpl vectorCache; + ScanLevelProjection scanProj; private ReaderSchemaOrchestrator currentReader; - private SchemaSmoother schemaSmoother; + SchemaSmoother schemaSmoother; // Output - private VectorContainer outputContainer; + VectorContainer outputContainer; public ScanSchemaOrchestrator(BufferAllocator allocator) { this.allocator = allocator; @@ -493,20 +282,12 @@ public class ScanSchemaOrchestrator { ScanProjectionParser parser = metadataManager.projectionParser(); if (parser != null) { - - // For compatibility with Drill 1.12, insert the file metadata - // parser before others so that, in a wildcard query, metadata - // columns appear before others (such as the `columns` column.) - // This is temporary and should be removed once the test framework - // is restored to Drill 1.11 functionality. - parsers.add(parser); } // Parse the projection list. scanProj = new ScanLevelProjection(projection, parsers); - if (scanProj.hasWildcard() && useSchemaSmoothing) { schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers); } @@ -526,7 +307,7 @@ public class ScanSchemaOrchestrator { public ReaderSchemaOrchestrator startReader() { closeReader(); - currentReader = new ReaderSchemaOrchestrator(); + currentReader = new ReaderSchemaOrchestrator(this); return currentReader; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java index c7bae278e..a75611432 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java @@ -61,8 +61,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; public class SchemaLevelProjection { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class); - /** * Schema-level projection is customizable. Implement this interface, and * add an instance to the scan orchestrator, to perform custom mappings @@ -81,10 +79,7 @@ public class SchemaLevelProjection { protected SchemaLevelProjection( List<SchemaProjectionResolver> resolvers) { - if (resolvers == null) { - resolvers = new ArrayList<>(); - } - this.resolvers = resolvers; + this.resolvers = resolvers == null ? new ArrayList<>() : resolvers; for (SchemaProjectionResolver resolver : resolvers) { resolver.startResolution(); } @@ -97,6 +92,8 @@ public class SchemaLevelProjection { return; } } - throw new IllegalStateException("No resolver for column: " + col.nodeType()); + throw new IllegalStateException( + String.format("No resolver for column `%s` of type %d", + col.name(), col.nodeType())); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java index a5a52c075..155fcf886 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java @@ -22,6 +22,56 @@ * or may be "missing" with null values applied. The code here prepares * a run-time projection plan based on the actual table schema. * <p> + * Looks at schema as a set of transforms. + * <ul> + * <li>Scan-level projection list from the query plan: The list of columns + * (or the wildcard) as requested by the user in the query. The planner + * determines which columns to project. In Drill, projection is speculative: + * it is a list of names which the planner hopes will appear in the data + * files. The reader must make up columns (the infamous nullable INT) when + * it turns out that no such column exists. Else, the reader must figure out + * the data type for any columns that does exist. + * <p> + * The scan project list defines the set of columns which the scan operator + * is obliged to send downstream. Ideally, the scan operator sends exactly the + * same schema (the project list with types filled in) for all batches. Since + * batches may come from different files, the scan operator is obligated to + * unify the schemas from those files (or blocks.)</ul> + * <li>Reader (file)-level projection occurs for each reader. A single scan + * may use multiple readers to read data. Each reader may offer more information + * about the schema. For example, a Parquet reader can obtain schema information + * from the Parquet headers. A JDBC reader obtains schema information from the + * returned schema. This is called "early schema." File-based readers can at least + * add implicit file or partition columns. + * <p> + * The result is a refined schema: the scan level schema with more information + * filled in. For Parquet, all projection information can be filled in. For + * CSV or JSON, we can only add file metadata information, but not yet the + * actual data schema.</ul> + * <li>Batch-level schema: once a reader reads actual data, it now knows + * exactly what it read. This is the "schema on read model." Thus, after reading + * a batch, any remaining uncertainty about the projected schema is removed. + * The actual data defined data types and so on. + * <p> + * Readers such as JSON and CSV are "late schema": they don't know the data + * schema until they read the file. This is true "schema on read." Further, for + * JSON, the data may change from one batch to the next as the reader "discovers" + * fields that did not appear in earlier batches. This requires some amount of + * "schema smoothing": the ability to preserve a consistent output schema even + * as the input schema jiggles around some.</ul> + * </ul> + * <p> + * The goal of this mechanism is to handle the above use cases cleanly, in a + * common set of classes, and to avoid the need for each reader to figure out + * all these issues for themselves (as was the case with earlier versions of + * Drill.) + * <p> + * Because these issues are complex, the code itself is complex. To make the + * code easier to manage, each bit of functionality is encapsulated in a + * distinct class. Classes combine via composition to create a "framework" + * suitable for each kind of reader: whether it be early or late schema, + * file-based or something else, etc. + * <p> * The core concept is one of successive refinement of the project * list through a set of rewrites: * <ul> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java index c1e383eb6..f464bae72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java @@ -37,7 +37,7 @@ public class ImpliedTupleRequest implements RequestedTuple { new ImpliedTupleRequest(false); public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>(); - private boolean allProjected; + private final boolean allProjected; public ImpliedTupleRequest(boolean allProjected) { this.allProjected = allProjected; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java index cd782c766..4643c57d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java @@ -73,7 +73,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace; public class RequestedTupleImpl implements RequestedTuple { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class); private final RequestedColumnImpl parent; private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>(); @@ -86,6 +86,13 @@ public class RequestedTupleImpl implements RequestedTuple { this.parent = parent; } + public RequestedTupleImpl(List<RequestedColumn> cols) { + parent = null; + for (RequestedColumn col : cols) { + projection.add(col.name(), col); + } + } + @Override public RequestedColumn get(String colName) { return projection.get(colName.toLowerCase()); @@ -119,10 +126,43 @@ public class RequestedTupleImpl implements RequestedTuple { } /** + * Create a requested tuple projection from a rewritten top-level + * projection list. The columns within the list have already been parsed to + * pick out arrays, maps and scalars. The list must not include the + * wildcard: a wildcard list must be passed in as a null list. An + * empty list means project nothing. Null list means project all, else + * project only the columns in the list. + * + * @param projList top-level, parsed columns + * @return the tuple projection for the top-leel row + */ + + public static RequestedTuple build(List<RequestedColumn> projList) { + if (projList == null) { + return new ImpliedTupleRequest(true); + } + if (projList.isEmpty()) { + return new ImpliedTupleRequest(false); + } + return new RequestedTupleImpl(projList); + } + + /** * Parse a projection list. The list should consist of a list of column names; - * any wildcards should have been processed by the caller. An empty list means + * or wildcards. An empty list means * nothing is projected. A null list means everything is projected (that is, a * null list here is equivalent to a wildcard in the SELECT statement.) + * <p> + * The projection list may include both a wildcard and column names (as in + * the case of implicit columns.) This results in a final list that both + * says that everything is projected, and provides the list of columns. + * <p> + * Parsing is used at two different times. First, to parse the list from + * the physical operator. This has the case above: an explicit wildcard + * and/or additional columns. Then, this class is used again to prepare the + * physical projection used when reading. In this case, wildcards should + * be removed, implicit columns pulled out, and just the list of read-level + * columns should remain. * * @param projList * the list of projected columns, or null if no projection is to be diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java index 3587e2711..88ccd3c41 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java @@ -22,33 +22,42 @@ import static org.junit.Assert.fail; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetUtilities; import org.apache.hadoop.fs.Path; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +/** + * Test the "columns" array mechanism integrated with the scan schema + * orchestrator including simulating reading data. + */ + +@Category(RowSetTests.class) public class TestColumnsArray extends SubOperatorTest { - /** - * Test columns array. The table must be able to support it by having a - * matching column. - */ + private static class MockScanner { + ScanSchemaOrchestrator scanner; + ReaderSchemaOrchestrator reader; + ResultSetLoader loader; + } - @Test - public void testColumnsArray() { + private MockScanner buildScanner(List<SchemaPath> projList) { + + MockScanner mock = new MockScanner(); // Set up the file metadata manager @@ -64,21 +73,19 @@ public class TestColumnsArray extends SubOperatorTest { // Configure the schema orchestrator - ScanSchemaOrchestrator scanner = new ScanSchemaOrchestrator(fixture.allocator()); - scanner.withMetadata(metadataManager); - scanner.addParser(colsManager.projectionParser()); - scanner.addResolver(colsManager.resolver()); + mock.scanner = new ScanSchemaOrchestrator(fixture.allocator()); + mock.scanner.withMetadata(metadataManager); + mock.scanner.addParser(colsManager.projectionParser()); + mock.scanner.addResolver(colsManager.resolver()); - // SELECT filename, columns, dir0 ... + // SELECT <proj list> ... - scanner.build(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, - ColumnsArrayManager.COLUMNS_COL, - ScanTestUtils.partitionColName(0))); + mock.scanner.build(projList); // FROM z.csv metadataManager.startFile(filePath); - ReaderSchemaOrchestrator reader = scanner.startReader(); + mock.reader = mock.scanner.startReader(); // Table schema (columns: VARCHAR[]) @@ -86,7 +93,25 @@ public class TestColumnsArray extends SubOperatorTest { .addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR) .buildSchema(); - ResultSetLoader loader = reader.makeTableLoader(tableSchema); + mock.loader = mock.reader.makeTableLoader(tableSchema); + + // First empty batch + + mock.reader.defineSchema(); + return mock; + } + + /** + * Test columns array. The table must be able to support it by having a + * matching column. + */ + + @Test + public void testColumnsArray() { + + MockScanner mock = buildScanner(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, + ColumnsArrayManager.COLUMNS_COL, + ScanTestUtils.partitionColName(0))); // Verify empty batch. @@ -99,18 +124,18 @@ public class TestColumnsArray extends SubOperatorTest { SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) .build(); - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + assertNotNull(mock.scanner.output()); + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); } // Create a batch of data. - reader.startBatch(); - loader.writer() + mock.reader.startBatch(); + mock.loader.writer() .addRow(new Object[] {new String[] {"fred", "flintstone"}}) .addRow(new Object[] {new String[] {"barney", "rubble"}}); - reader.endBatch(); + mock. reader.endBatch(); // Verify @@ -120,11 +145,100 @@ public class TestColumnsArray extends SubOperatorTest { .addRow("z.csv", new String[] {"barney", "rubble"}, "x") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); } - scanner.close(); + mock.scanner.close(); + } + + @Test + public void testWildcard() { + + MockScanner mock = buildScanner(RowSetTestUtils.projectAll()); + + // Verify empty batch. + + TupleMetadata expectedSchema = new SchemaBuilder() + .addArray("columns", MinorType.VARCHAR) + .buildSchema(); + { + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .build(); + + assertNotNull(mock.scanner.output()); + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); + } + + // Create a batch of data. + + mock.reader.startBatch(); + mock.loader.writer() + .addRow(new Object[] {new String[] {"fred", "flintstone"}}) + .addRow(new Object[] {new String[] {"barney", "rubble"}}); + mock. reader.endBatch(); + + // Verify + + { + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addSingleCol(new String[] {"fred", "flintstone"}) + .addSingleCol(new String[] {"barney", "rubble"}) + .build(); + + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); + } + + mock.scanner.close(); + } + + @Test + public void testWildcardAndFileMetadata() { + + MockScanner mock = buildScanner(RowSetTestUtils.projectList( + ScanTestUtils.FILE_NAME_COL, + SchemaPath.DYNAMIC_STAR, + ScanTestUtils.partitionColName(0))); + + // Verify empty batch. + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("filename", MinorType.VARCHAR) + .addArray("columns", MinorType.VARCHAR) + .addNullable("dir0", MinorType.VARCHAR) + .buildSchema(); + { + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .build(); + + assertNotNull(mock.scanner.output()); + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); + } + + // Create a batch of data. + + mock.reader.startBatch(); + mock.loader.writer() + .addRow(new Object[] {new String[] {"fred", "flintstone"}}) + .addRow(new Object[] {new String[] {"barney", "rubble"}}); + mock. reader.endBatch(); + + // Verify + + { + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("z.csv", new String[] {"fred", "flintstone"}, "x") + .addRow("z.csv", new String[] {"barney", "rubble"}, "x") + .build(); + + RowSetUtilities.verify(expected, + fixture.wrap(mock.scanner.output())); + } + + mock.scanner.close(); } private ScanSchemaOrchestrator buildScan(List<SchemaPath> cols) { @@ -160,6 +274,7 @@ public class TestColumnsArray extends SubOperatorTest { try { ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(tableSchema); + reader.defineSchema(); fail(); } catch (IllegalStateException e) { // Expected @@ -180,6 +295,7 @@ public class TestColumnsArray extends SubOperatorTest { try { ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(tableSchema); + reader.defineSchema(); fail(); } catch (IllegalStateException e) { // Expected diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java index 4f32b56e0..e7a0188cd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -46,7 +47,7 @@ import org.apache.drill.test.SubOperatorTest; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -54,6 +55,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * Test the columns-array specific behavior in the columns scan framework. */ +@Category(RowSetTests.class) public class TestColumnsArrayFramework extends SubOperatorTest { private static final Path MOCK_FILE_PATH = new Path("file:/w/x/y/z.csv"); @@ -101,7 +103,7 @@ public class TestColumnsArrayFramework extends SubOperatorTest { @Override public boolean open(ColumnsSchemaNegotiator negotiator) { this.negotiator = negotiator; - negotiator.setTableSchema(schema); + negotiator.setTableSchema(schema, true); negotiator.build(); return true; } @@ -115,6 +117,11 @@ public class TestColumnsArrayFramework extends SubOperatorTest { public void close() { } } + /** + * Test including a column other than "columns". Occurs when + * using implicit columns. + */ + @Test public void testNonColumnsProjection() { @@ -143,6 +150,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest { scanFixture.close(); } + /** + * Test projecting just the `columns` column. + */ + @Test public void testColumnsProjection() { @@ -171,6 +182,10 @@ public class TestColumnsArrayFramework extends SubOperatorTest { scanFixture.close(); } + /** + * Test including a specific index of `columns` such as + * `columns`[1]. + */ @Test public void testColumnsIndexProjection() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java index e2c8a21d3..d1e91a283 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager; @@ -35,9 +36,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; import org.apache.drill.test.SubOperatorTest; import org.apache.hadoop.fs.Path; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +@Category(RowSetTests.class) public class TestColumnsArrayParser extends SubOperatorTest { /** @@ -255,5 +257,45 @@ public class TestColumnsArrayParser extends SubOperatorTest { assertEquals(FileMetadataColumn.ID, scanProj.columns().get(2).nodeType()); } - // TODO: Test Columns element projection + /** + * If a query is of the form: + * <pre><code> + * select * from dfs.`multilevel/csv` where columns[1] < 1000 + * </code><pre> + * Then the projection list passed to the scan operator + * includes both the wildcard and the `columns` array. + * We can ignore one of them. + */ + + @Test + public void testWildcardAndColumns() { + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList( + SchemaPath.DYNAMIC_STAR, + ColumnsArrayManager.COLUMNS_COL), + ScanTestUtils.parsers(new ColumnsArrayParser(true))); + + assertFalse(scanProj.projectAll()); + assertEquals(2, scanProj.requestedCols().size()); + + assertEquals(1, scanProj.columns().size()); + assertEquals(ColumnsArrayManager.COLUMNS_COL, scanProj.columns().get(0).name()); + + // Verify column type + + assertEquals(UnresolvedColumnsArrayColumn.ID, scanProj.columns().get(0).nodeType()); + } + + @Test + public void testColumnsAsMap() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList("columns.x"), + ScanTestUtils.parsers(new ColumnsArrayParser(true))); + fail(); + } + catch (UserException e) { + // Expected + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java deleted file mode 100644 index 330a66115..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestConstantColumnLoader.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.scan.file.FileMetadata; -import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn; -import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn; -import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn; -import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader; -import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec; -import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns; -import org.apache.drill.test.SubOperatorTest; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; -import org.apache.hadoop.fs.Path; -import org.junit.Test; - -public class TestConstantColumnLoader extends SubOperatorTest { - - private static class DummyColumn implements ConstantColumnSpec { - - private String name; - private MaterializedField schema; - private String value; - - public DummyColumn(String name, MajorType type, String value) { - this.name = name; - this.schema = MaterializedField.create(name, type); - this.value = value; - } - - @Override - public String name() { return name; } - - @Override - public MaterializedField schema() { return schema; } - - @Override - public String value() { return value; } - } - - /** - * Test the static column loader using one column of each type. - * The null column is of type int, but the associated value is of - * type string. This is a bit odd, but works out because we detect that - * the string value is null and call setNull on the writer, and avoid - * using the actual data. - */ - - @Test - public void testConstantColumnLoader() { - - MajorType aType = MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.REQUIRED) - .build(); - MajorType bType = MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.OPTIONAL) - .build(); - - List<ConstantColumnSpec> defns = new ArrayList<>(); - defns.add( - new DummyColumn("a", aType, "a-value" )); - defns.add( - new DummyColumn("b", bType, "b-value" )); - - ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); - ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns); - - // Create a batch - - staticLoader.load(2); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("a", aType) - .add("b", bType) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("a-value", "b-value") - .addRow("a-value", "b-value") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); - staticLoader.close(); - } - - @Test - public void testFileMetadata() { - - FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w")); - List<ConstantColumnSpec> defns = new ArrayList<>(); - FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn( - ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX); - FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL, - iDefn, fileInfo, null, 0); - defns.add(iCol); - - String partColName = ScanTestUtils.partitionColName(1); - PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0); - defns.add(pCol); - - ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); - ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns); - - // Create a batch - - staticLoader.load(2); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR) - .addNullable(partColName, MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("csv", "y") - .addRow("csv", "y") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); - staticLoader.close(); - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java index 08aeed53f..a6de5e6f3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataColumnParser.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn; import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager; @@ -33,9 +34,10 @@ import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; import org.apache.drill.test.SubOperatorTest; import org.apache.hadoop.fs.Path; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +@Category(RowSetTests.class) public class TestFileMetadataColumnParser extends SubOperatorTest { @Test @@ -135,8 +137,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { assertEquals(PartitionColumn.ID, scanProj.columns().get(0).nodeType()); } + /** + * Test wildcard expansion. + */ + @Test - public void testWildcard() { + public void testRevisedWildcard() { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), @@ -153,15 +159,45 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { } /** + * Legacy (prior version) wildcard expansion always expands partition + * columns. + */ + + @Test + public void testLegacyWildcard() { + Path filePath = new Path("hdfs:///w/x/y/z.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + true, // Use legacy wildcard expansion + true, // Put partitions at end + new Path("hdfs:///w"), + Lists.newArrayList(filePath)); + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + Lists.newArrayList(metadataManager.projectionParser())); + + List<ColumnProjection> cols = scanProj.columns(); + assertEquals(3, cols.size()); + assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); + assertEquals(0, ((PartitionColumn) cols.get(1)).partition()); + assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); + assertEquals(1, ((PartitionColumn) cols.get(2)).partition()); + } + + /** * Combine wildcard and file metadata columms. The wildcard expands * table columns but not metadata columns. */ @Test - public void testWildcardAndFileMetadata() { + public void testLegacyWildcardAndFileMetadata() { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + true, // Use legacy wildcard expansion + false, // Put partitions at end new Path("hdfs:///w"), Lists.newArrayList(filePath)); @@ -173,10 +209,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Lists.newArrayList(metadataManager.projectionParser())); List<ColumnProjection> cols = scanProj.columns(); - assertEquals(3, cols.size()); + assertEquals(5, cols.size()); assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); assertEquals(FileMetadataColumn.ID, cols.get(1).nodeType()); assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(3).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(4).nodeType()); } /** @@ -185,10 +223,12 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { */ @Test - public void testWildcardAndFileMetadataMixed() { + public void testLegacyWildcardAndFileMetadataMixed() { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), + true, // Use legacy wildcard expansion + false, // Put partitions at end new Path("hdfs:///w"), Lists.newArrayList(filePath)); @@ -200,18 +240,25 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { Lists.newArrayList(metadataManager.projectionParser())); List<ColumnProjection> cols = scanProj.columns(); - assertEquals(3, cols.size()); + assertEquals(5, cols.size()); assertEquals(FileMetadataColumn.ID, cols.get(0).nodeType()); assertEquals(UnresolvedColumn.WILDCARD, cols.get(1).nodeType()); assertEquals(FileMetadataColumn.ID, cols.get(2).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(3).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(4).nodeType()); } /** - * Include both a wildcard and a partition column. + * Include both a wildcard and a partition column. The wildcard, in + * legacy mode, will create partition columns for any partitions not + * mentioned in the project list. + * <p> + * Tests proposed functionality: included only requested partition + * columns. */ @Test - public void testWildcardAndPartition() { + public void testRevisedWildcardAndPartition() { Path filePath = new Path("hdfs:///w/x/y/z.csv"); FileMetadataManager metadataManager = new FileMetadataManager( fixture.getOptionManager(), @@ -229,6 +276,113 @@ public class TestFileMetadataColumnParser extends SubOperatorTest { assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); } + @Test + public void testLegacyWildcardAndPartition() { + Path filePath = new Path("hdfs:///w/x/y/z.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + true, // Use legacy wildcard expansion + true, // Put partitions at end + new Path("hdfs:///w"), + Lists.newArrayList(filePath)); + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, + ScanTestUtils.partitionColName(8)), + Lists.newArrayList(metadataManager.projectionParser())); + + List<ColumnProjection> cols = scanProj.columns(); + assertEquals(4, cols.size()); + assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); + assertEquals(0, ((PartitionColumn) cols.get(1)).partition()); + assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); + assertEquals(1, ((PartitionColumn) cols.get(2)).partition()); + assertEquals(PartitionColumn.ID, cols.get(3).nodeType()); + assertEquals(8, ((PartitionColumn) cols.get(3)).partition()); + } + + @Test + public void testPreferredPartitionExpansion() { + Path filePath = new Path("hdfs:///w/x/y/z.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + true, // Use legacy wildcard expansion + false, // Put partitions at end + new Path("hdfs:///w"), + Lists.newArrayList(filePath)); + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, + ScanTestUtils.partitionColName(8)), + Lists.newArrayList(metadataManager.projectionParser())); + + List<ColumnProjection> cols = scanProj.columns(); + assertEquals(4, cols.size()); + assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); + assertEquals(8, ((PartitionColumn) cols.get(1)).partition()); + assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); + assertEquals(0, ((PartitionColumn) cols.get(2)).partition()); + assertEquals(PartitionColumn.ID, cols.get(3).nodeType()); + assertEquals(1, ((PartitionColumn) cols.get(3)).partition()); + } + + /** + * Test a case like:<br> + * <code>SELECT *, dir1 FROM ...</code><br> + * The projection list includes "dir1". The wildcard will + * fill in "dir0". + */ + + @Test + public void testLegacyWildcardAndPartitionWithOverlap() { + Path filePath = new Path("hdfs:///w/x/y/z.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + true, // Use legacy wildcard expansion + true, // Put partitions at end + new Path("hdfs:///w"), + Lists.newArrayList(filePath)); + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, + ScanTestUtils.partitionColName(1)), + Lists.newArrayList(metadataManager.projectionParser())); + + List<ColumnProjection> cols = scanProj.columns(); + assertEquals(3, cols.size()); + assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); + assertEquals(0, ((PartitionColumn) cols.get(1)).partition()); + assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); + assertEquals(1, ((PartitionColumn) cols.get(2)).partition()); + } + + @Test + public void testPreferedWildcardExpansionWithOverlap() { + Path filePath = new Path("hdfs:///w/x/y/z.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + true, // Use legacy wildcard expansion + false, // Put partitions at end + new Path("hdfs:///w"), + Lists.newArrayList(filePath)); + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, + ScanTestUtils.partitionColName(1)), + Lists.newArrayList(metadataManager.projectionParser())); + + List<ColumnProjection> cols = scanProj.columns(); + assertEquals(3, cols.size()); + assertEquals(UnresolvedColumn.WILDCARD, cols.get(0).nodeType()); + assertEquals(PartitionColumn.ID, cols.get(1).nodeType()); + assertEquals(1, ((PartitionColumn) cols.get(1)).partition()); + assertEquals(PartitionColumn.ID, cols.get(2).nodeType()); + assertEquals(0, ((PartitionColumn) cols.get(2)).partition()); + } + /** * Verify that names that look like metadata columns, but appear * to be maps or arrays, are not interpreted as metadata. That is, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java index 9161932d4..314bc2a13 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileMetadataProjection.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.scan.file.FileMetadata; @@ -46,9 +47,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.SubOperatorTest; import org.apache.hadoop.fs.Path; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +@Category(RowSetTests.class) public class TestFileMetadataProjection extends SubOperatorTest { @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java index a10e7665a..2fdf3e637 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -32,7 +33,7 @@ import org.apache.drill.exec.physical.impl.scan.TestScanOperatorExec.AbstractSca import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework; import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework.FileSchemaNegotiator; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework; -import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderCreator; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.RowSetLoader; @@ -50,6 +51,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Tests the file metadata extensions to the file operator framework. @@ -57,6 +59,7 @@ import org.junit.Test; * verified the underlying mechanisms. */ +@Category(RowSetTests.class) public class TestFileScanFramework extends SubOperatorTest { private static final String MOCK_FILE_NAME = "foo.csv"; @@ -117,7 +120,7 @@ public class TestFileScanFramework extends SubOperatorTest { } } - public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderCreator { + public static class FileScanOpFixture extends BaseFileScanOpFixture implements FileReaderFactory { protected final List<MockFileReader> readers = new ArrayList<>(); protected Iterator<MockFileReader> readerIter; @@ -252,7 +255,7 @@ public class TestFileScanFramework extends SubOperatorTest { .add("a", MinorType.INT) .addNullable("b", MinorType.VARCHAR, 10) .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); tableLoader = schemaNegotiator.build(); return true; } @@ -486,7 +489,7 @@ public class TestFileScanFramework extends SubOperatorTest { .add("b", MinorType.INT) .resumeSchema() .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); tableLoader = schemaNegotiator.build(); return true; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java deleted file mode 100644 index a41305221..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestNullColumnLoader.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import static org.junit.Assert.assertSame; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; -import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; -import org.apache.drill.exec.physical.rowSet.ResultVectorCache; -import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; -import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.test.SubOperatorTest; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; -import org.junit.Test; - -public class TestNullColumnLoader extends SubOperatorTest { - - private ResolvedNullColumn makeNullCol(String name, MajorType nullType) { - - // For this test, we don't need the projection, so just - // set it to null. - - return new ResolvedNullColumn(name, nullType, null, 0); - } - - private ResolvedNullColumn makeNullCol(String name) { - return makeNullCol(name, null); - } - - /** - * Test the simplest case: default null type, nothing in the vector - * cache. Specify no column type, the special NULL type, or a - * predefined type. Output types should be set accordingly. - */ - - @Test - public void testBasics() { - - List<ResolvedNullColumn> defns = new ArrayList<>(); - defns.add(makeNullCol("unspecified", null)); - defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL))); - defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR))); - defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR))); - defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR))); - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false); - - // Create a batch - - VectorContainer output = staticLoader.load(2); - - // Verify values and types - - BatchSchema expectedSchema = new SchemaBuilder() - .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) - .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) - .addNullable("specifiedOpt", MinorType.VARCHAR) - .addNullable("specifiedReq", MinorType.VARCHAR) - .addArray("specifiedArray", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(null, null, null, null, new String[] {}) - .addRow(null, null, null, null, new String[] {}) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(output)); - staticLoader.close(); - } - - @Test - public void testCustomNullType() { - - List<ResolvedNullColumn> defns = new ArrayList<>(); - defns.add(makeNullCol("unspecified", null)); - defns.add(makeNullCol("nullType", MajorType.newBuilder() - .setMinorType(MinorType.NULL) - .setMode(DataMode.OPTIONAL) - .build())); - defns.add(makeNullCol("nullTypeReq", MajorType.newBuilder() - .setMinorType(MinorType.NULL) - .setMode(DataMode.REQUIRED) - .build())); - - // Null type array does not make sense, so is not tested. - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - MajorType nullType = MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.OPTIONAL) - .build(); - NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); - - // Create a batch - - VectorContainer output = staticLoader.load(2); - - // Verify values and types - - BatchSchema expectedSchema = new SchemaBuilder() - .add("unspecified", nullType) - .add("nullType", nullType) - .add("nullTypeReq", nullType) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(null, null, null) - .addRow(null, null, null) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(output)); - staticLoader.close(); - } - - @Test - public void testCachedTypesMapToNullable() { - - List<ResolvedNullColumn> defns = new ArrayList<>(); - defns.add(makeNullCol("req")); - defns.add(makeNullCol("opt")); - defns.add(makeNullCol("rep")); - defns.add(makeNullCol("unk")); - - // Populate the cache with a column of each mode. - - ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); - cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); - ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); - ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); - - // Use nullable Varchar for unknown null columns. - - MajorType nullType = MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.OPTIONAL) - .build(); - NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); - - // Create a batch - - VectorContainer output = staticLoader.load(2); - - // Verify vectors are reused - - assertSame(opt, output.getValueVector(1).getValueVector()); - assertSame(rep, output.getValueVector(2).getValueVector()); - - // Verify values and types - - BatchSchema expectedSchema = new SchemaBuilder() - .addNullable("req", MinorType.FLOAT8) - .addNullable("opt", MinorType.FLOAT8) - .addArray("rep", MinorType.FLOAT8) - .addNullable("unk", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(null, null, new int[] { }, null) - .addRow(null, null, new int[] { }, null) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(output)); - staticLoader.close(); - } - - @Test - public void testCachedTypesAllowRequired() { - - List<ResolvedNullColumn> defns = new ArrayList<>(); - defns.add(makeNullCol("req")); - defns.add(makeNullCol("opt")); - defns.add(makeNullCol("rep")); - defns.add(makeNullCol("unk")); - - // Populate the cache with a column of each mode. - - ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); - cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); - ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); - ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); - - // Use nullable Varchar for unknown null columns. - - MajorType nullType = MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.OPTIONAL) - .build(); - NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true); - - // Create a batch - - VectorContainer output = staticLoader.load(2); - - // Verify vectors are reused - - assertSame(opt, output.getValueVector(1).getValueVector()); - assertSame(rep, output.getValueVector(2).getValueVector()); - - // Verify values and types - - BatchSchema expectedSchema = new SchemaBuilder() - .add("req", MinorType.FLOAT8) - .addNullable("opt", MinorType.FLOAT8) - .addArray("rep", MinorType.FLOAT8) - .addNullable("unk", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(0.0, null, new int[] { }, null) - .addRow(0.0, null, new int[] { }, null) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(output)); - staticLoader.close(); - } - - @Test - public void testNullColumnBuilder() { - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - NullColumnBuilder builder = new NullColumnBuilder(null, false); - - builder.add("unspecified"); - builder.add("nullType", Types.optional(MinorType.NULL)); - builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR)); - builder.add("specifiedReq", Types.required(MinorType.VARCHAR)); - builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR)); - builder.build(cache); - - // Create a batch - - builder.load(2); - - // Verify values and types - - BatchSchema expectedSchema = new SchemaBuilder() - .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) - .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) - .addNullable("specifiedOpt", MinorType.VARCHAR) - .addNullable("specifiedReq", MinorType.VARCHAR) - .addArray("specifiedArray", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(null, null, null, null, new String[] {}) - .addRow(null, null, null, null, new String[] {}) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(builder.output())); - builder.close(); - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java deleted file mode 100644 index 079455789..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestRowBatchMerger.java +++ /dev/null @@ -1,459 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; -import org.apache.drill.exec.physical.impl.scan.project.VectorSource; -import org.apache.drill.exec.physical.rowSet.ResultVectorCache; -import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.test.SubOperatorTest; -import org.apache.drill.test.rowSet.RowSet; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; -import org.junit.Test; - -import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; -import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap; -import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; - - -/** - * Test the row batch merger by merging two batches. Tests both the - * "direct" and "exchange" cases. Direct means that the output container - * contains the source vector directly: they are the same vectors. - * Exchange means we have two vectors, but we swap the underlying - * Drillbufs to effectively shift data from source to destination - * vector. - */ - -public class TestRowBatchMerger extends SubOperatorTest { - - public static class RowSetSource implements VectorSource { - - private SingleRowSet rowSet; - - public RowSetSource(SingleRowSet rowSet) { - this.rowSet = rowSet; - } - - public RowSet rowSet() { return rowSet; } - - public void clear() { - rowSet.clear(); - } - - @Override - public ValueVector vector(int index) { - return rowSet.container().getValueVector(index).getValueVector(); - } - } - - private RowSetSource makeFirst() { - BatchSchema firstSchema = new SchemaBuilder() - .add("d", MinorType.VARCHAR) - .add("a", MinorType.INT) - .build(); - return new RowSetSource( - fixture.rowSetBuilder(firstSchema) - .addRow("barney", 10) - .addRow("wilma", 20) - .build()); - } - - private RowSetSource makeSecond() { - BatchSchema secondSchema = new SchemaBuilder() - .add("b", MinorType.INT) - .add("c", MinorType.VARCHAR) - .build(); - return new RowSetSource( - fixture.rowSetBuilder(secondSchema) - .addRow(1, "foo.csv") - .addRow(2, "foo.csv") - .build()); - } - - public static class TestProjection extends ResolvedColumn { - - public TestProjection(VectorSource source, int sourceIndex) { - super(source, sourceIndex); - } - - @Override - public String name() { return null; } - - @Override - public int nodeType() { return -1; } - - @Override - public MaterializedField schema() { return null; } - } - - @Test - public void testSimpleFlat() { - - // Create the first batch - - RowSetSource first = makeFirst(); - - // Create the second batch - - RowSetSource second = makeSecond(); - - ResolvedRow resolvedTuple = new ResolvedRow(null); - resolvedTuple.add(new TestProjection(first, 1)); - resolvedTuple.add(new TestProjection(second, 0)); - resolvedTuple.add(new TestProjection(second, 1)); - resolvedTuple.add(new TestProjection(first, 0)); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(null, output); - output.setRecordCount(first.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.INT) - .add("c", MinorType.VARCHAR) - .add("d", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, 1, "foo.csv", "barney") - .addRow(20, 2, "foo.csv", "wilma") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - } - - @Test - public void testImplicitFlat() { - - // Create the first batch - - RowSetSource first = makeFirst(); - - // Create the second batch - - RowSetSource second = makeSecond(); - - ResolvedRow resolvedTuple = new ResolvedRow(null); - resolvedTuple.add(new TestProjection(resolvedTuple, 1)); - resolvedTuple.add(new TestProjection(second, 0)); - resolvedTuple.add(new TestProjection(second, 1)); - resolvedTuple.add(new TestProjection(resolvedTuple, 0)); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(first.rowSet().container(), output); - output.setRecordCount(first.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.INT) - .add("c", MinorType.VARCHAR) - .add("d", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, 1, "foo.csv", "barney") - .addRow(20, 2, "foo.csv", "wilma") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - } - - @Test - public void testFlatWithNulls() { - - // Create the first batch - - RowSetSource first = makeFirst(); - - // Create null columns - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - - ResolvedRow resolvedTuple = new ResolvedRow(builder); - resolvedTuple.add(new TestProjection(resolvedTuple, 1)); - resolvedTuple.add(resolvedTuple.nullBuilder().add("null1")); - resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); - resolvedTuple.add(new TestProjection(resolvedTuple, 0)); - - // Build the null values - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - builder.build(cache); - builder.load(first.rowSet().rowCount()); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(first.rowSet().container(), output); - output.setRecordCount(first.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("null1", MinorType.INT) - .addNullable("null2", MinorType.VARCHAR) - .add("d", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, null, null, "barney") - .addRow(20, null, null, "wilma") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - builder.close(); - } - - /** - * Test the ability to create maps from whole cloth if requested in - * the projection list, and the map is not available from the data - * source. - */ - - @Test - public void testNullMaps() { - - // Create the first batch - - RowSetSource first = makeFirst(); - - // Create null columns - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow resolvedTuple = new ResolvedRow(builder); - - resolvedTuple.add(new TestProjection(resolvedTuple, 1)); - - ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1"); - ResolvedTuple nullMap = nullMapCol.members(); - nullMap.add(nullMap.nullBuilder().add("null1")); - nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); - - ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2"); - ResolvedTuple nullMap2 = nullMapCol2.members(); - nullMap2.add(nullMap2.nullBuilder().add("null3")); - nullMap.add(nullMapCol2); - - resolvedTuple.add(nullMapCol); - resolvedTuple.add(new TestProjection(resolvedTuple, 0)); - - // Build the null values - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - resolvedTuple.buildNulls(cache); - - // LoadNulls - - resolvedTuple.loadNulls(first.rowSet().rowCount()); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(first.rowSet().container(), output); - resolvedTuple.setRowCount(first.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addMap("map1") - .addNullable("null1", MinorType.INT) - .addNullable("null2", MinorType.VARCHAR) - .addMap("map2") - .addNullable("null3", MinorType.INT) - .resumeMap() - .resumeSchema() - .add("d", MinorType.VARCHAR) - .build(); - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(10, mapValue(null, null, singleMap(null)), "barney") - .addRow(20, mapValue(null, null, singleMap(null)), "wilma") - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - resolvedTuple.close(); - } - - /** - * Test that the merger mechanism can rewrite a map to include - * projected null columns. - */ - - @Test - public void testMapRevision() { - - // Create the first batch - - BatchSchema inputSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .addMap("a") - .add("c", MinorType.INT) - .resumeSchema() - .build(); - RowSetSource input = new RowSetSource( - fixture.rowSetBuilder(inputSchema) - .addRow("barney", singleMap(10)) - .addRow("wilma", singleMap(20)) - .build()); - - // Create mappings - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow resolvedTuple = new ResolvedRow(builder); - - resolvedTuple.add(new TestProjection(resolvedTuple, 0)); - ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, - inputSchema.getColumn(1), 1); - resolvedTuple.add(mapCol); - ResolvedTuple map = mapCol.members(); - map.add(new TestProjection(map, 0)); - map.add(map.nullBuilder().add("null1")); - - // Build the null values - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - resolvedTuple.buildNulls(cache); - - // LoadNulls - - resolvedTuple.loadNulls(input.rowSet().rowCount()); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(input.rowSet().container(), output); - output.setRecordCount(input.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .addMap("a") - .add("c", MinorType.INT) - .addNullable("null1", MinorType.INT) - .resumeSchema() - .build(); - RowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("barney", mapValue(10, null)) - .addRow("wilma", mapValue(20, null)) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - } - - /** - * Test that the merger mechanism can rewrite a map array to include - * projected null columns. - */ - - @Test - public void testMapArrayRevision() { - - // Create the first batch - - BatchSchema inputSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .addMapArray("a") - .add("c", MinorType.INT) - .resumeSchema() - .build(); - RowSetSource input = new RowSetSource( - fixture.rowSetBuilder(inputSchema) - .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12))) - .addRow("wilma", mapArray(singleMap(20), singleMap(21))) - .build()); - - // Create mappings - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow resolvedTuple = new ResolvedRow(builder); - - resolvedTuple.add(new TestProjection(resolvedTuple, 0)); - ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, - inputSchema.getColumn(1), 1); - resolvedTuple.add(mapCol); - ResolvedTuple map = mapCol.members(); - map.add(new TestProjection(map, 0)); - map.add(map.nullBuilder().add("null1")); - - // Build the null values - - ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); - resolvedTuple.buildNulls(cache); - - // LoadNulls - - resolvedTuple.loadNulls(input.rowSet().rowCount()); - - // Do the merge - - VectorContainer output = new VectorContainer(fixture.allocator()); - resolvedTuple.project(input.rowSet().container(), output); - output.setRecordCount(input.rowSet().rowCount()); - RowSet result = fixture.wrap(output); - - // Verify - - BatchSchema expectedSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .addMapArray("a") - .add("c", MinorType.INT) - .addNullable("null1", MinorType.INT) - .resumeSchema() - .build(); - RowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("barney", mapArray( - mapValue(10, null), mapValue(11, null), mapValue(12, null))) - .addRow("wilma", mapArray( - mapValue(20, null), mapValue(21, null))) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(result); - } - -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java index 912a7172e..03587cb08 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanBatchWriters.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl.scan; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.AbstractSubScan; @@ -34,6 +35,7 @@ import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Test; +import org.junit.experimental.categories.Category; import io.netty.buffer.DrillBuf; @@ -42,6 +44,7 @@ import io.netty.buffer.DrillBuf; * set follows the same semantics as the original set. */ +@Category(RowSetTests.class) public class TestScanBatchWriters extends SubOperatorTest { @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java deleted file mode 100644 index 558f761c9..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanLevelProjection.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; -import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; -import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; -import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; -import org.apache.drill.exec.record.metadata.ProjectionType; -import org.apache.drill.test.SubOperatorTest; -import org.junit.Test; - -/** - * Test the level of projection done at the level of the scan as a whole; - * before knowledge of table "implicit" columns or the specific table schema. - */ - -public class TestScanLevelProjection extends SubOperatorTest { - - /** - * Basic test: select a set of columns (a, b, c) when the - * data source has an early schema of (a, c, d). (a, c) are - * projected, (d) is null. - */ - - @Test - public void testBasics() { - - // Simulate SELECT a, b, c ... - // Build the projection plan and verify - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a", "b", "c"), - ScanTestUtils.parsers()); - assertFalse(scanProj.projectAll()); - assertFalse(scanProj.projectNone()); - - assertEquals(3, scanProj.requestedCols().size()); - assertEquals("a", scanProj.requestedCols().get(0).rootName()); - assertEquals("b", scanProj.requestedCols().get(1).rootName()); - assertEquals("c", scanProj.requestedCols().get(2).rootName()); - - assertEquals(3, scanProj.columns().size()); - assertEquals("a", scanProj.columns().get(0).name()); - assertEquals("b", scanProj.columns().get(1).name()); - assertEquals("c", scanProj.columns().get(2).name()); - - // Verify column type - - assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); - } - - @Test - public void testMap() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"), - ScanTestUtils.parsers()); - assertFalse(scanProj.projectAll()); - assertFalse(scanProj.projectNone()); - - assertEquals(3, scanProj.columns().size()); - assertEquals("a", scanProj.columns().get(0).name()); - assertEquals("b", scanProj.columns().get(1).name()); - assertEquals("c", scanProj.columns().get(2).name()); - - // Verify column type - - assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); - - // Map structure - - RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); - assertTrue(a.isTuple()); - assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x")); - assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y")); - assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z")); - - RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element(); - assertTrue(c.isSimple()); - } - - @Test - public void testArray() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a[1]", "a[3]"), - ScanTestUtils.parsers()); - assertFalse(scanProj.projectAll()); - assertFalse(scanProj.projectNone()); - - assertEquals(1, scanProj.columns().size()); - assertEquals("a", scanProj.columns().get(0).name()); - - // Verify column type - - assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); - - // Map structure - - RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); - assertTrue(a.isArray()); - assertFalse(a.hasIndex(0)); - assertTrue(a.hasIndex(1)); - assertFalse(a.hasIndex(2)); - assertTrue(a.hasIndex(3)); - } - - /** - * Simulate a SELECT * query by passing "**" (Drill's internal version - * of the wildcard) as a column name. - */ - - @Test - public void testWildcard() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - assertTrue(scanProj.projectAll()); - assertFalse(scanProj.projectNone()); - assertEquals(1, scanProj.requestedCols().size()); - assertTrue(scanProj.requestedCols().get(0).isDynamicStar()); - - assertEquals(1, scanProj.columns().size()); - assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name()); - - // Verify bindings - - assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName()); - - // Verify column type - - assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType()); - } - - /** - * Test an empty projection which occurs in a - * SELECT COUNT(*) query. - */ - - @Test - public void testEmptyProjection() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList(), - ScanTestUtils.parsers()); - - assertFalse(scanProj.projectAll()); - assertTrue(scanProj.projectNone()); - assertEquals(0, scanProj.requestedCols().size()); - } - - /** - * Can't include both a wildcard and a column name. - */ - - @Test - public void testErrorWildcardAndColumns() { - try { - new ScanLevelProjection( - RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"), - ScanTestUtils.parsers()); - fail(); - } catch (IllegalArgumentException e) { - // Expected - } - } - - /** - * Can't include both a column name and a wildcard. - */ - - @Test - public void testErrorColumnAndWildcard() { - try { - new ScanLevelProjection( - RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR), - ScanTestUtils.parsers()); - fail(); - } catch (IllegalArgumentException e) { - // Expected - } - } - - /** - * Can't include a wildcard twice. - * <p> - * Note: Drill actually allows this, but the work should be done - * in the project operator; scan should see at most one wildcard. - */ - - @Test - public void testErrorTwoWildcards() { - try { - new ScanLevelProjection( - RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR), - ScanTestUtils.parsers()); - fail(); - } catch (UserException e) { - // Expected - } - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java index 33752a746..386849bb6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperatorExec.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; @@ -54,6 +55,7 @@ import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Test of the scan operator framework. Here the focus is on the @@ -66,6 +68,7 @@ import org.junit.Test; * appear elsewhere. */ +@Category(RowSetTests.class) public class TestScanOperatorExec extends SubOperatorTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestScanOperatorExec.class); @@ -187,7 +190,7 @@ public class TestScanOperatorExec extends SubOperatorTest { .add("a", MinorType.INT) .addNullable("b", MinorType.VARCHAR, 10) .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); tableLoader = schemaNegotiator.build(); return true; } @@ -213,7 +216,7 @@ public class TestScanOperatorExec extends SubOperatorTest { .add("a", MinorType.VARCHAR) .addNullable("b", MinorType.VARCHAR, 10) .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); schemaNegotiator.build(); tableLoader = schemaNegotiator.build(); return true; @@ -1367,7 +1370,7 @@ public class TestScanOperatorExec extends SubOperatorTest { TupleMetadata schema = new SchemaBuilder() .add("a", MinorType.VARCHAR) .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); tableLoader = schemaNegotiator.build(); return true; } @@ -1493,7 +1496,7 @@ public class TestScanOperatorExec extends SubOperatorTest { TupleMetadata schema = new SchemaBuilder() .add("a", MinorType.INT) .buildSchema(); - schemaNegotiator.setTableSchema(schema); + schemaNegotiator.setTableSchema(schema, true); tableLoader = schemaNegotiator.build(); return true; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java index d8c9a65ef..ef79b0ee6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorEarlySchema.java @@ -22,12 +22,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; import org.apache.drill.exec.record.BatchSchema; @@ -37,8 +38,9 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetUtilities; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Test the early-schema support of the scan orchestrator. "Early schema" @@ -49,6 +51,7 @@ import org.junit.Test; * that tests for lower-level components have already passed. */ +@Category(RowSetTests.class) public class TestScanOrchestratorEarlySchema extends SubOperatorTest { /** @@ -78,16 +81,17 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Schema provided, so an empty batch is available to - // send downstream. + // Simulate a first reader in a scan that can provide an + // empty batch to define schema. { + reader.defineSchema(); SingleRowSet expected = fixture.rowSetBuilder(tableSchema) .build(); assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } // Create a batch of data. @@ -107,8 +111,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(2, "wilma") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } // Second batch. @@ -128,8 +132,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(4, "betty") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } // Explicit reader close. (All other tests are lazy, they @@ -167,16 +171,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch. - - { - SingleRowSet expected = fixture.rowSetBuilder(tableSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + // Don't bother with an empty batch here or in other tests. + // Simulates the second reader in a scan. // Create a batch of data. @@ -188,15 +184,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(tableSchema) - .addRow(1, "fred") - .addRow(2, "wilma") - .build(); + SingleRowSet expected = fixture.rowSetBuilder(tableSchema) + .addRow(1, "fred") + .addRow(2, "wilma") + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -228,20 +222,10 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch. - BatchSchema expectedSchema = new SchemaBuilder() .add("b", MinorType.VARCHAR) .add("a", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -253,17 +237,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("fred", 1) - .addRow("wilma", 2) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("fred", 1) + .addRow("wilma", 2) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); - scanner.close(); + scanner.close(); } /** @@ -294,21 +276,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch - BatchSchema expectedSchema = new SchemaBuilder() .add("a", MinorType.INT) .add("b", MinorType.VARCHAR) .addNullable("c", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -320,15 +292,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(1, "fred", null) - .addRow(2, "wilma", null) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(1, "fred", null) + .addRow(2, "wilma", null) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -369,21 +339,11 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch - BatchSchema expectedSchema = new SchemaBuilder() .add("a", MinorType.INT) .add("b", MinorType.VARCHAR) .addNullable("c", MinorType.VARCHAR) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -395,15 +355,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(1, "fred", null) - .addRow(2, "wilma", null) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(1, "fred", null) + .addRow(2, "wilma", null) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -440,19 +398,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { assertFalse(loader.writer().column("b").schema().isProjected()); - // Verify empty batch. - BatchSchema expectedSchema = new SchemaBuilder() .add("a", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -464,15 +412,13 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow(1) - .addRow(2) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(1) + .addRow(2) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -516,16 +462,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { BatchSchema expectedSchema = new SchemaBuilder() .build(); - { - // Expect an empty schema - - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -545,8 +481,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow() .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } // Fast path to fill in empty rows @@ -592,18 +528,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { reader.makeTableLoader(tableSchema); - // Schema provided, so an empty batch is available to - // send downstream. - - { - SingleRowSet expected = fixture.rowSetBuilder(tableSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } - // Create a batch of data. Because there are no columns, it does // not make sense to ready any rows. @@ -616,8 +540,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { SingleRowSet expected = fixture.rowSetBuilder(tableSchema) .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } scanner.close(); @@ -650,19 +574,9 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { reader.makeTableLoader(tableSchema); - // Verify initial empty batch. - BatchSchema expectedSchema = new SchemaBuilder() .addNullable("a", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. Because there are no columns, it does // not make sense to ready any rows. @@ -672,16 +586,14 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .build(); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } + /** * The projection mechanism provides "type smoothing": null * columns prefer the type of previously-seen non-null columns. @@ -718,6 +630,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(table1Schema); + reader.defineSchema(); VectorContainer output = scanner.output(); tracker.trackSchema(output); schemaVersion = tracker.schemaVersion(); @@ -737,6 +650,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .buildSchema(); ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(table2Schema); + reader.defineSchema(); VectorContainer output = scanner.output(); tracker.trackSchema(output); assertEquals(schemaVersion, tracker.schemaVersion()); @@ -756,6 +670,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .buildSchema(); ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(table3Schema); + reader.defineSchema(); VectorContainer output = scanner.output(); tracker.trackSchema(output); assertEquals(schemaVersion, tracker.schemaVersion()); @@ -776,6 +691,7 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .buildSchema(); ReaderSchemaOrchestrator reader = scanner.startReader(); reader.makeTableLoader(table2Schema); + reader.defineSchema(); VectorContainer output = scanner.output(); tracker.trackSchema(output); assertEquals(MinorType.BIGINT, output.getSchema().getColumn(0).getType().getMinorType()); @@ -843,8 +759,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(10, "fred") .addRow(20, "wilma") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); + RowSetUtilities.verify(expected, + fixture.wrap(projector.output())); } { // ... FROM table 2 @@ -874,8 +790,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(30, null) .addRow(40, null) .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); + RowSetUtilities.verify(expected, + fixture.wrap(projector.output())); } { // ... FROM table 3 @@ -899,8 +815,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(50, "dino") .addRow(60, "barney") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); + RowSetUtilities.verify(expected, + fixture.wrap(projector.output())); } projector.close(); @@ -926,9 +842,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ReaderSchemaOrchestrator reader = scanner.startReader(); ResultSetLoader loader = reader.makeTableLoader(schema1); - tracker.trackSchema(scanner.output()); - schemaVersion = tracker.schemaVersion(); - // Create a batch reader.startBatch(); @@ -936,6 +849,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow("fred") .addRow("wilma"); reader.endBatch(); + tracker.trackSchema(scanner.output()); + schemaVersion = tracker.schemaVersion(); // Verify @@ -944,8 +859,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow("wilma") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.closeReader(); } { @@ -960,8 +875,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ReaderSchemaOrchestrator reader = scanner.startReader(); ResultSetLoader loader = reader.makeTableLoader(schema2); - tracker.trackSchema(scanner.output()); - assertEquals(schemaVersion, tracker.schemaVersion()); // Create a batch @@ -973,13 +886,15 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify, using persistent schema + tracker.trackSchema(scanner.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); SingleRowSet expected = fixture.rowSetBuilder(schema1) .addRow("barney") .addRow("betty") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.closeReader(); } { @@ -994,9 +909,6 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { ReaderSchemaOrchestrator reader = scanner.startReader(); ResultSetLoader loader = reader.makeTableLoader(schema3); - tracker.trackSchema(scanner.output()); - assertEquals(schemaVersion, tracker.schemaVersion()); - // Create a batch reader.startBatch(); @@ -1007,13 +919,16 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { // Verify, using persistent schema + tracker.trackSchema(scanner.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); + SingleRowSet expected = fixture.rowSetBuilder(schema1) .addRow("bam-bam") .addRow("pebbles") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.closeReader(); } @@ -1073,8 +988,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(10, "fred", 110L) .addRow(20, "wilma", 110L) .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.closeReader(); } @@ -1100,8 +1015,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(30, "bambam", 330L) .addRow(40, "betty", 440L) .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } { // ... FROM table 3 @@ -1125,8 +1040,8 @@ public class TestScanOrchestratorEarlySchema extends SubOperatorTest { .addRow(50, "dino", 550L) .addRow(60, "barney", 660L) .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } scanner.close(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java index 0cf2cba31..84ffc4e32 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorLateSchema.java @@ -18,10 +18,12 @@ package org.apache.drill.exec.physical.impl.scan; import static org.junit.Assert.assertFalse; + +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.RowSetLoader; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; @@ -32,6 +34,7 @@ import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Test; +import org.junit.experimental.categories.Category; /** * Test the late-schema support in the scan orchestrator. "Late schema" is the case @@ -43,6 +46,7 @@ import org.junit.Test; * that tests for lower-level components have already passed. */ +@Category(RowSetTests.class) public class TestScanOrchestratorLateSchema extends SubOperatorTest { /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java index e10055114..c7b52e2da 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorMetadata.java @@ -20,14 +20,15 @@ package org.apache.drill.exec.physical.impl.scan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; import org.apache.drill.exec.record.BatchSchema; @@ -35,10 +36,10 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.RowSetUtilities; import org.apache.hadoop.fs.Path; import org.junit.Test; - +import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** @@ -46,6 +47,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * with implicit file columns provided by the file metadata manager. */ +@Category(RowSetTests.class) public class TestScanOrchestratorMetadata extends SubOperatorTest { /** @@ -104,8 +106,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { .addRow(2, "wilma", "/w/x/y/z.csv", "/w/x/y", "z.csv", "csv", "x", "y") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -146,19 +148,9 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch. - BatchSchema expectedSchema = new SchemaBuilder() .addNullable("c", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -170,15 +162,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addSingleCol(null) - .addSingleCol(null) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addSingleCol(null) + .addSingleCol(null) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -229,6 +219,7 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { // Verify empty batch. + reader.defineSchema(); BatchSchema expectedSchema = new SchemaBuilder() .add("a", MinorType.INT) .add("b", MinorType.VARCHAR) @@ -240,8 +231,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { .build(); assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } // Create a batch of data. @@ -260,8 +251,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { .addRow(2, "wilma", "x", "csv") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); } scanner.close(); @@ -302,22 +293,12 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { ResultSetLoader loader = reader.makeTableLoader(tableSchema); - // Verify empty batch. - BatchSchema expectedSchema = new SchemaBuilder() .addNullable("dir0", MinorType.VARCHAR) .add("b", MinorType.VARCHAR) .add("suffix", MinorType.VARCHAR) .addNullable("c", MinorType.INT) .build(); - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .build(); - - assertNotNull(scanner.output()); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } // Create a batch of data. @@ -329,15 +310,13 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { // Verify - { - SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) - .addRow("x", "fred", "csv", null) - .addRow("x", "wilma", "csv", null) - .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("x", "fred", "csv", null) + .addRow("x", "wilma", "csv", null) + .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); - } + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.close(); } @@ -403,8 +382,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { .addRow("x", "y", "a.csv", "fred") .addRow("x", "y", "a.csv", "wilma") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); // Do explicit close (as in real code) to avoid an implicit // close which will blow away the current file info... @@ -431,8 +410,8 @@ public class TestScanOrchestratorMetadata extends SubOperatorTest { .addRow("x", null, "b.csv", "bambam") .addRow("x", null, "b.csv", "betty") .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(scanner.output())); + RowSetUtilities.verify(expected, + fixture.wrap(scanner.output())); scanner.closeReader(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java deleted file mode 100644 index 021d7e381..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaLevelProjection.java +++ /dev/null @@ -1,557 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.List; - -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; -import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection; -import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; -import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; -import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; -import org.apache.drill.exec.physical.impl.scan.project.VectorSource; -import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; -import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.test.SubOperatorTest; -import org.junit.Test; - -public class TestSchemaLevelProjection extends SubOperatorTest { - - @Test - public void testWildcard() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - assertEquals(1, scanProj.columns().size()); - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .addNullable("c", MinorType.INT) - .addArray("d", MinorType.FLOAT8) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new WildcardSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(3, columns.size()); - - assertEquals("a", columns.get(0).name()); - assertEquals(0, columns.get(0).sourceIndex()); - assertSame(rootTuple, columns.get(0).source()); - assertEquals("c", columns.get(1).name()); - assertEquals(1, columns.get(1).sourceIndex()); - assertSame(rootTuple, columns.get(1).source()); - assertEquals("d", columns.get(2).name()); - assertEquals(2, columns.get(2).sourceIndex()); - assertSame(rootTuple, columns.get(2).source()); - } - - /** - * Test SELECT list with columns defined in a order and with - * name case different than the early-schema table. - */ - - @Test - public void testFullList() { - - // Simulate SELECT c, b, a ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("c", "b", "a"), - ScanTestUtils.parsers()); - assertEquals(3, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (a, b, c) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .add("B", MinorType.VARCHAR) - .add("C", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(3, columns.size()); - - assertEquals("c", columns.get(0).name()); - assertEquals(2, columns.get(0).sourceIndex()); - assertSame(rootTuple, columns.get(0).source()); - - assertEquals("b", columns.get(1).name()); - assertEquals(1, columns.get(1).sourceIndex()); - assertSame(rootTuple, columns.get(1).source()); - - assertEquals("a", columns.get(2).name()); - assertEquals(0, columns.get(2).sourceIndex()); - assertSame(rootTuple, columns.get(2).source()); - } - - /** - * Test SELECT list with columns missing from the table schema. - */ - - @Test - public void testMissing() { - - // Simulate SELECT c, v, b, w ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("c", "v", "b", "w"), - ScanTestUtils.parsers()); - assertEquals(4, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (a, b, c) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .add("B", MinorType.VARCHAR) - .add("C", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(4, columns.size()); - VectorSource nullBuilder = rootTuple.nullBuilder(); - - assertEquals("c", columns.get(0).name()); - assertEquals(2, columns.get(0).sourceIndex()); - assertSame(rootTuple, columns.get(0).source()); - - assertEquals("v", columns.get(1).name()); - assertEquals(0, columns.get(1).sourceIndex()); - assertSame(nullBuilder, columns.get(1).source()); - - assertEquals("b", columns.get(2).name()); - assertEquals(1, columns.get(2).sourceIndex()); - assertSame(rootTuple, columns.get(2).source()); - - assertEquals("w", columns.get(3).name()); - assertEquals(1, columns.get(3).sourceIndex()); - assertSame(nullBuilder, columns.get(3).source()); - } - - @Test - public void testSubset() { - - // Simulate SELECT c, a ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("c", "a"), - ScanTestUtils.parsers()); - assertEquals(2, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (a, b, c) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .add("B", MinorType.VARCHAR) - .add("C", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(2, columns.size()); - - assertEquals("c", columns.get(0).name()); - assertEquals(2, columns.get(0).sourceIndex()); - assertSame(rootTuple, columns.get(0).source()); - - assertEquals("a", columns.get(1).name()); - assertEquals(0, columns.get(1).sourceIndex()); - assertSame(rootTuple, columns.get(1).source()); - } - - @Test - public void testDisjoint() { - - // Simulate SELECT c, a ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("b"), - ScanTestUtils.parsers()); - assertEquals(1, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (a) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("A", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(1, columns.size()); - VectorSource nullBuilder = rootTuple.nullBuilder(); - - assertEquals("b", columns.get(0).name()); - assertEquals(0, columns.get(0).sourceIndex()); - assertSame(nullBuilder, columns.get(0).source()); - } - - @Test - public void testOmittedMap() { - - // Simulate SELECT a, b.c.d ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a", "b.c.d"), - ScanTestUtils.parsers()); - assertEquals(2, scanProj.columns().size()); - { - assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType()); - UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1)); - assertTrue(bCol.element().isTuple()); - } - - // Simulate a data source, with early schema, of (a) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(2, columns.size()); - - // Should have resolved a to a table column, b to a missing map. - - // A is projected - - ResolvedColumn aCol = columns.get(0); - assertEquals("a", aCol.name()); - assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); - - // B is not projected, is implicitly a map - - ResolvedColumn bCol = columns.get(1); - assertEquals("b", bCol.name()); - assertEquals(ResolvedMapColumn.ID, bCol.nodeType()); - - ResolvedMapColumn bMap = (ResolvedMapColumn) bCol; - ResolvedTuple bMembers = bMap.members(); - assertNotNull(bMembers); - assertEquals(1, bMembers.columns().size()); - - // C is a map within b - - ResolvedColumn cCol = bMembers.columns().get(0); - assertEquals(ResolvedMapColumn.ID, cCol.nodeType()); - - ResolvedMapColumn cMap = (ResolvedMapColumn) cCol; - ResolvedTuple cMembers = cMap.members(); - assertNotNull(cMembers); - assertEquals(1, cMembers.columns().size()); - - // D is an unknown column type (not a map) - - ResolvedColumn dCol = cMembers.columns().get(0); - assertEquals(ResolvedNullColumn.ID, dCol.nodeType()); - } - - /** - * Test of a map with missing columns. - * table of (a{b, c}), project a.c, a.d, a.e.f - */ - - @Test - public void testOmittedMapMembers() { - - // Simulate SELECT a.c, a.d, a.e.f ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"), - ScanTestUtils.parsers()); - assertEquals(3, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (x, y, a{b, c}) - - TupleMetadata tableSchema = new SchemaBuilder() - .add("x", MinorType.VARCHAR) - .add("y", MinorType.INT) - .addMap("a") - .add("b", MinorType.BIGINT) - .add("c", MinorType.FLOAT8) - .resumeSchema() - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(3, columns.size()); - - // Should have resolved a.b to a map column, - // a.d to a missing nested map, and a.e.f to a missing - // nested map member - - // X is projected - - ResolvedColumn xCol = columns.get(0); - assertEquals("x", xCol.name()); - assertEquals(ResolvedTableColumn.ID, xCol.nodeType()); - assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source()); - assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex()); - - // Y is projected - - ResolvedColumn yCol = columns.get(2); - assertEquals("y", yCol.name()); - assertEquals(ResolvedTableColumn.ID, yCol.nodeType()); - assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source()); - assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex()); - - // A is projected - - ResolvedColumn aCol = columns.get(1); - assertEquals("a", aCol.name()); - assertEquals(ResolvedMapColumn.ID, aCol.nodeType()); - - ResolvedMapColumn aMap = (ResolvedMapColumn) aCol; - ResolvedTuple aMembers = aMap.members(); - assertFalse(aMembers.isSimpleProjection()); - assertNotNull(aMembers); - assertEquals(3, aMembers.columns().size()); - - // a.c is projected - - ResolvedColumn acCol = aMembers.columns().get(0); - assertEquals("c", acCol.name()); - assertEquals(ResolvedTableColumn.ID, acCol.nodeType()); - assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex()); - - // a.d is not in the table, is null - - ResolvedColumn adCol = aMembers.columns().get(1); - assertEquals("d", adCol.name()); - assertEquals(ResolvedNullColumn.ID, adCol.nodeType()); - - // a.e is not in the table, is implicitly a map - - ResolvedColumn aeCol = aMembers.columns().get(2); - assertEquals("e", aeCol.name()); - assertEquals(ResolvedMapColumn.ID, aeCol.nodeType()); - - ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol; - ResolvedTuple aeMembers = aeMap.members(); - assertFalse(aeMembers.isSimpleProjection()); - assertNotNull(aeMembers); - assertEquals(1, aeMembers.columns().size()); - - // a.d.f is a null column - - ResolvedColumn aefCol = aeMembers.columns().get(0); - assertEquals("f", aefCol.name()); - assertEquals(ResolvedNullColumn.ID, aefCol.nodeType()); - } - - /** - * Simple map project. This is an internal case in which the - * query asks for a set of columns inside a map, and the table - * loader produces exactly that set. No special projection is - * needed, the map is projected as a whole. - */ - - @Test - public void testSimpleMapProject() { - - // Simulate SELECT a.b, a.c ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a.b", "a.c"), - ScanTestUtils.parsers()); - assertEquals(1, scanProj.columns().size()); - - // Simulate a data source, with early schema, of (a{b, c}) - - TupleMetadata tableSchema = new SchemaBuilder() - .addMap("a") - .add("b", MinorType.BIGINT) - .add("c", MinorType.FLOAT8) - .resumeSchema() - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(1, columns.size()); - - // Should have resolved a.b to a map column, - // a.d to a missing nested map, and a.e.f to a missing - // nested map member - - // a is projected as a vector, not as a structured map - - ResolvedColumn aCol = columns.get(0); - assertEquals("a", aCol.name()); - assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); - assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); - assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); - } - - /** - * Project of a non-map as a map - */ - - @Test - public void testMapMismatch() { - - // Simulate SELECT a.b ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a.b"), - ScanTestUtils.parsers()); - - // Simulate a data source, with early schema, of (a) - // where a is not a map. - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - try { - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - fail(); - } catch (UserException e) { - // Expected - } - } - - /** - * Test project of an array. At the scan level, we just verify - * that the requested column is, indeed, an array. - */ - - @Test - public void testArrayProject() { - - // Simulate SELECT a[0] ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a[0]"), - ScanTestUtils.parsers()); - - // Simulate a data source, with early schema, of (a) - // where a is not an array. - - TupleMetadata tableSchema = new SchemaBuilder() - .addArray("a", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(1, columns.size()); - - ResolvedColumn aCol = columns.get(0); - assertEquals("a", aCol.name()); - assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); - assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); - assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); - } - - /** - * Project of a non-array as an array - */ - - @Test - public void testArrayMismatch() { - - // Simulate SELECT a[0] ... - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList("a[0]"), - ScanTestUtils.parsers()); - - // Simulate a data source, with early schema, of (a) - // where a is not an array. - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.VARCHAR) - .buildSchema(); - - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - try { - new ExplicitSchemaProjection( - scanProj, tableSchema, rootTuple, - ScanTestUtils.resolvers()); - fail(); - } catch (UserException e) { - // Expected - } - } -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java deleted file mode 100644 index a30321b0d..000000000 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestSchemaSmoothing.java +++ /dev/null @@ -1,946 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.scan; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.List; - -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; -import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn; -import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager; -import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection; -import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; -import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; -import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother; -import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; -import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection; -import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; -import org.apache.drill.exec.physical.rowSet.ResultSetLoader; -import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; -import org.apache.drill.exec.record.metadata.SchemaBuilder; -import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.test.SubOperatorTest; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; -import org.apache.drill.test.rowSet.RowSetComparison; -import org.apache.hadoop.fs.Path; -import org.junit.Test; - -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; - -/** - * Tests schema smoothing at the schema projection level. - * This level handles reusing prior types when filling null - * values. But, because no actual vectors are involved, it - * does not handle the schema chosen for a table ahead of - * time, only the schema as it is merged with prior schema to - * detect missing columns. - * <p> - * Focuses on the <tt>SmoothingProjection</tt> class itself. - * <p> - * Note that, at present, schema smoothing does not work for entire - * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt> - * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does - * not currently know how to recreate the map. The same is true of - * lists and unions. Handling such cases is complex and is probably - * better handled via a system that allows the user to specify their - * intent by providing a schema to apply to the two files. - */ - -public class TestSchemaSmoothing extends SubOperatorTest { - - /** - * Sanity test for the simple, discrete case. The purpose of - * discrete is just to run the basic lifecycle in a way that - * is compatible with the schema-persistence version. - */ - - @Test - public void testDiscrete() { - - // Set up the file metadata manager - - Path filePathA = new Path("hdfs:///w/x/y/a.csv"); - Path filePathB = new Path("hdfs:///w/x/y/b.csv"); - FileMetadataManager metadataManager = new FileMetadataManager( - fixture.getOptionManager(), - new Path("hdfs:///w"), - Lists.newArrayList(filePathA, filePathB)); - - // Set up the scan level projection - - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"), - ScanTestUtils.parsers(metadataManager.projectionParser())); - - { - // Define a file a.csv - - metadataManager.startFile(filePathA); - - // Build the output schema from the (a, b) table schema - - TupleMetadata twoColSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR, 10) - .buildSchema(); - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, twoColSchema, rootTuple, - ScanTestUtils.resolvers(metadataManager)); - - // Verify the full output schema - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("filename", MinorType.VARCHAR) - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR, 10) - .buildSchema(); - - // Verify - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(3, columns.size()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name()); - assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value()); - assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType()); - } - { - // Define a file b.csv - - metadataManager.startFile(filePathB); - - // Build the output schema from the (a) table schema - - TupleMetadata oneColSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .buildSchema(); - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new ExplicitSchemaProjection( - scanProj, oneColSchema, rootTuple, - ScanTestUtils.resolvers(metadataManager)); - - // Verify the full output schema - // Since this mode is "discrete", we don't remember the type - // of the missing column. (Instead, it is filled in at the - // vector level as part of vector persistence.) During projection, it is - // marked with type NULL so that the null column builder will fill in - // the proper type. - - TupleMetadata expectedSchema = new SchemaBuilder() - .add("filename", MinorType.VARCHAR) - .add("a", MinorType.INT) - .addNullable("b", MinorType.NULL) - .buildSchema(); - - // Verify - - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals(3, columns.size()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name()); - assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value()); - assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType()); - assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType()); - } - } - - /** - * Low-level test of the smoothing projection, including the exceptions - * it throws when things are not going its way. - */ - - @Test - public void testSmoothingProjection() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - // Table 1: (a: nullable bigint, b) - - TupleMetadata schema1 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .add("c", MinorType.FLOAT8) - .buildSchema(); - ResolvedRow priorSchema; - { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new WildcardSchemaProjection( - scanProj, schema1, rootTuple, - ScanTestUtils.resolvers()); - priorSchema = rootTuple; - } - - // Table 2: (a: nullable bigint, c), column omitted, original schema preserved - - TupleMetadata schema2 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .add("c", MinorType.FLOAT8) - .buildSchema(); - try { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new SmoothingProjection( - scanProj, schema2, priorSchema, rootTuple, - ScanTestUtils.resolvers()); - assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); - priorSchema = rootTuple; - } catch (IncompatibleSchemaException e) { - fail(); - } - - // Table 3: (a, c, d), column added, must replan schema - - TupleMetadata schema3 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .add("c", MinorType.FLOAT8) - .add("d", MinorType.INT) - .buildSchema(); - try { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new SmoothingProjection( - scanProj, schema3, priorSchema, rootTuple, - ScanTestUtils.resolvers()); - fail(); - } catch (IncompatibleSchemaException e) { - // Expected - } - - // Table 4: (a: double), change type must replan schema - - TupleMetadata schema4 = new SchemaBuilder() - .addNullable("a", MinorType.FLOAT8) - .addNullable("b", MinorType.VARCHAR) - .add("c", MinorType.FLOAT8) - .buildSchema(); - try { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new SmoothingProjection( - scanProj, schema4, priorSchema, rootTuple, - ScanTestUtils.resolvers()); - fail(); - } catch (IncompatibleSchemaException e) { - // Expected - } - -// // Table 5: (a: not-nullable bigint): convert to nullable for consistency -// -// TupleMetadata schema5 = new SchemaBuilder() -// .addNullable("a", MinorType.BIGINT) -// .add("c", MinorType.FLOAT8) -// .buildSchema(); -// try { -// SmoothingProjection schemaProj = new SmoothingProjection( -// scanProj, schema5, dummySource, dummySource, -// new ArrayList<>(), priorSchema); -// assertTrue(schema1.isEquivalent(ScanTestUtils.schema(schemaProj.columns()))); -// } catch (IncompatibleSchemaException e) { -// fail(); -// } - - // Table 6: Drop a non-nullable column, must replan - - TupleMetadata schema6 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - try { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - new SmoothingProjection( - scanProj, schema6, priorSchema, rootTuple, - ScanTestUtils.resolvers()); - fail(); - } catch (IncompatibleSchemaException e) { - // Expected - } - } - - /** - * Case in which the table schema is a superset of the prior - * schema. Discard prior schema. Turn off auto expansion of - * metadata for a simpler test. - */ - - @Test - public void testSmaller() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - - { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - smoother.resolve(priorSchema, rootTuple); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); - } - { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - smoother.resolve(tableSchema, rootTuple); - assertEquals(2, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); - } - } - - /** - * Case in which the table schema and prior are disjoint - * sets. Discard the prior schema. - */ - - @Test - public void testDisjoint() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(2, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); - } - } - - private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) { - NullColumnBuilder builder = new NullColumnBuilder(null, false); - ResolvedRow rootTuple = new ResolvedRow(builder); - smoother.resolve(schema, rootTuple); - return rootTuple; - } - - /** - * Column names match, but types differ. Discard the prior schema. - */ - - @Test - public void testDifferentTypes() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(2, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); - } - } - - /** - * The prior and table schemas are identical. Preserve the prior - * schema (though, the output is no different than if we discarded - * the prior schema...) - */ - - @Test - public void testSameSchemas() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple); - assertTrue(actualSchema.isEquivalent(tableSchema)); - assertTrue(actualSchema.isEquivalent(priorSchema)); - } - } - - /** - * The prior and table schemas are identical, but the cases of names differ. - * Preserve the case of the first schema. - */ - - @Test - public void testDifferentCase() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("A", MinorType.INT) - .add("B", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); - List<ResolvedColumn> columns = rootTuple.columns(); - assertEquals("a", columns.get(0).name()); - } - } - - /** - * Can't preserve the prior schema if it had required columns - * where the new schema has no columns. - */ - - @Test - public void testRequired() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(2, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); - } - } - - /** - * Preserve the prior schema if table is a subset and missing columns - * are nullable or repeated. - */ - - @Test - public void testMissingNullableColumns() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .addNullable("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .addArray("c", MinorType.BIGINT) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); - } - } - - /** - * Preserve the prior schema if table is a subset. Map the table - * columns to the output using the prior schema ordering. - */ - - @Test - public void testReordering() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - TupleMetadata priorSchema = new SchemaBuilder() - .addNullable("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .addArray("c", MinorType.BIGINT) - .buildSchema(); - TupleMetadata tableSchema = new SchemaBuilder() - .add("b", MinorType.VARCHAR) - .addNullable("a", MinorType.INT) - .buildSchema(); - - { - doResolve(smoother, priorSchema); - } - { - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); - } - } - - /** - * If using the legacy wildcard expansion, reuse schema if partition paths - * are the same length. - */ - - @Test - public void testSamePartitionLength() { - - // Set up the file metadata manager - - Path filePathA = new Path("hdfs:///w/x/y/a.csv"); - Path filePathB = new Path("hdfs:///w/x/y/b.csv"); - FileMetadataManager metadataManager = new FileMetadataManager( - fixture.getOptionManager(), - new Path("hdfs:///w"), - Lists.newArrayList(filePathA, filePathB)); - - // Set up the scan level projection - - ScanLevelProjection scanProj = new ScanLevelProjection( - ScanTestUtils.projectAllWithMetadata(2), - ScanTestUtils.parsers(metadataManager.projectionParser())); - - // Define the schema smoother - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers(metadataManager)); - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - - TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); - { - metadataManager.startFile(filePathA); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - { - metadataManager.startFile(filePathB); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - } - - /** - * If using the legacy wildcard expansion, reuse schema if the new partition path - * is shorter than the previous. (Unneeded partitions will be set to null by the - * scan projector.) - */ - - @Test - public void testShorterPartitionLength() { - - // Set up the file metadata manager - - Path filePathA = new Path("hdfs:///w/x/y/a.csv"); - Path filePathB = new Path("hdfs:///w/x/b.csv"); - FileMetadataManager metadataManager = new FileMetadataManager( - fixture.getOptionManager(), - new Path("hdfs:///w"), - Lists.newArrayList(filePathA, filePathB)); - - // Set up the scan level projection - - ScanLevelProjection scanProj = new ScanLevelProjection( - ScanTestUtils.projectAllWithMetadata(2), - ScanTestUtils.parsers(metadataManager.projectionParser())); - - // Define the schema smoother - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers(metadataManager)); - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - - TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); - { - metadataManager.startFile(filePathA); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - { - metadataManager.startFile(filePathB); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - } - - /** - * If using the legacy wildcard expansion, we are able to use the same - * schema even if the new partition path is longer than the previous. - * Because all file names are provided up front. - */ - - @Test - public void testLongerPartitionLength() { - - // Set up the file metadata manager - - Path filePathA = new Path("hdfs:///w/x/a.csv"); - Path filePathB = new Path("hdfs:///w/x/y/b.csv"); - FileMetadataManager metadataManager = new FileMetadataManager( - fixture.getOptionManager(), - new Path("hdfs:///w"), - Lists.newArrayList(filePathA, filePathB)); - - // Set up the scan level projection - - ScanLevelProjection scanProj = new ScanLevelProjection( - ScanTestUtils.projectAllWithMetadata(2), - ScanTestUtils.parsers(metadataManager.projectionParser())); - - // Define the schema smoother - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers(metadataManager)); - - TupleMetadata tableSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .add("b", MinorType.VARCHAR) - .buildSchema(); - - TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); - { - metadataManager.startFile(filePathA); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - { - metadataManager.startFile(filePathB); - ResolvedRow rootTuple = doResolve(smoother, tableSchema); - assertEquals(1, smoother.schemaVersion()); - assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); - } - } - - /** - * Integrated test across multiple schemas at the batch level. - */ - - @Test - public void testSmoothableSchemaBatches() { - ScanLevelProjection scanProj = new ScanLevelProjection( - RowSetTestUtils.projectAll(), - ScanTestUtils.parsers()); - - SchemaSmoother smoother = new SchemaSmoother(scanProj, - ScanTestUtils.resolvers()); - - // Table 1: (a: bigint, b) - - TupleMetadata schema1 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .add("c", MinorType.FLOAT8) - .buildSchema(); - { - ResolvedRow rootTuple = doResolve(smoother, schema1); - - // Just use the original schema. - - assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); - assertEquals(1, smoother.schemaVersion()); - } - - // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved - - TupleMetadata schema2 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .add("c", MinorType.FLOAT8) - .buildSchema(); - { - ResolvedRow rootTuple = doResolve(smoother, schema2); - assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); - assertEquals(1, smoother.schemaVersion()); - } - - // Table 3: (a, c, d), column added, must replan schema - - TupleMetadata schema3 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .add("c", MinorType.FLOAT8) - .add("d", MinorType.INT) - .buildSchema(); - { - ResolvedRow rootTuple = doResolve(smoother, schema3); - assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple))); - assertEquals(2, smoother.schemaVersion()); - } - - // Table 4: Drop a non-nullable column, must replan - - TupleMetadata schema4 = new SchemaBuilder() - .addNullable("a", MinorType.BIGINT) - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - { - ResolvedRow rootTuple = doResolve(smoother, schema4); - assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple))); - assertEquals(3, smoother.schemaVersion()); - } - - // Table 5: (a: double), change type must replan schema - - TupleMetadata schema5 = new SchemaBuilder() - .addNullable("a", MinorType.FLOAT8) - .addNullable("b", MinorType.VARCHAR) - .buildSchema(); - { - ResolvedRow rootTuple = doResolve(smoother, schema5); - assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple))); - assertEquals(4, smoother.schemaVersion()); - } - -// // Table 6: (a: not-nullable bigint): convert to nullable for consistency -// -// TupleMetadata schema6 = new SchemaBuilder() -// .add("a", MinorType.FLOAT8) -// .add("b", MinorType.VARCHAR) -// .buildSchema(); -// { -// SchemaLevelProjection schemaProj = smoother.resolve(schema3, dummySource); -// assertTrue(schema5.isEquivalent(ScanTestUtils.schema(schemaProj.columns()))); -// } - } - - /** - * A SELECT * query uses the schema of the table as the output schema. - * This is trivial when the scanner has one table. But, if two or more - * tables occur, then things get interesting. The first table sets the - * schema. The second table then has: - * <ul> - * <li>The same schema, trivial case.</li> - * <li>A subset of the first table. The type of the "missing" column - * from the first table is used for a null column in the second table.</li> - * <li>A superset or disjoint set of the first schema. This triggers a hard schema - * change.</li> - * </ul> - * <p> - * It is an open question whether previous columns should be preserved on - * a hard reset. For now, the code implements, and this test verifies, that a - * hard reset clears the "memory" of prior schemas. - */ - - @Test - public void testWildcardSmoothing() { - ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator()); - projector.enableSchemaSmoothing(true); - projector.build(RowSetTestUtils.projectAll()); - - TupleMetadata firstSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR, 10) - .addNullable("c", MinorType.BIGINT) - .buildSchema(); - TupleMetadata subsetSchema = new SchemaBuilder() - .addNullable("b", MinorType.VARCHAR, 10) - .add("a", MinorType.INT) - .buildSchema(); - TupleMetadata disjointSchema = new SchemaBuilder() - .add("a", MinorType.INT) - .addNullable("b", MinorType.VARCHAR, 10) - .add("d", MinorType.VARCHAR) - .buildSchema(); - - SchemaTracker tracker = new SchemaTracker(); - int schemaVersion; - { - // First table, establishes the baseline - // ... FROM table 1 - - ReaderSchemaOrchestrator reader = projector.startReader(); - ResultSetLoader loader = reader.makeTableLoader(firstSchema); - - reader.startBatch(); - loader.writer() - .addRow(10, "fred", 110L) - .addRow(20, "wilma", 110L); - reader.endBatch(); - - tracker.trackSchema(projector.output()); - schemaVersion = tracker.schemaVersion(); - - SingleRowSet expected = fixture.rowSetBuilder(firstSchema) - .addRow(10, "fred", 110L) - .addRow(20, "wilma", 110L) - .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); - } - { - // Second table, same schema, the trivial case - // ... FROM table 2 - - ReaderSchemaOrchestrator reader = projector.startReader(); - ResultSetLoader loader = reader.makeTableLoader(firstSchema); - - reader.startBatch(); - loader.writer() - .addRow(70, "pebbles", 770L) - .addRow(80, "hoppy", 880L); - reader.endBatch(); - - tracker.trackSchema(projector.output()); - assertEquals(schemaVersion, tracker.schemaVersion()); - - SingleRowSet expected = fixture.rowSetBuilder(firstSchema) - .addRow(70, "pebbles", 770L) - .addRow(80, "hoppy", 880L) - .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); - } - { - // Third table: subset schema of first two - // ... FROM table 3 - - ReaderSchemaOrchestrator reader = projector.startReader(); - ResultSetLoader loader = reader.makeTableLoader(subsetSchema); - - reader.startBatch(); - loader.writer() - .addRow("bambam", 30) - .addRow("betty", 40); - reader.endBatch(); - - tracker.trackSchema(projector.output()); - assertEquals(schemaVersion, tracker.schemaVersion()); - - SingleRowSet expected = fixture.rowSetBuilder(firstSchema) - .addRow(30, "bambam", null) - .addRow(40, "betty", null) - .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); - } - { - // Fourth table: disjoint schema, cases a schema reset - // ... FROM table 4 - - ReaderSchemaOrchestrator reader = projector.startReader(); - ResultSetLoader loader = reader.makeTableLoader(disjointSchema); - - reader.startBatch(); - loader.writer() - .addRow(50, "dino", "supporting") - .addRow(60, "barney", "main"); - reader.endBatch(); - - tracker.trackSchema(projector.output()); - assertNotEquals(schemaVersion, tracker.schemaVersion()); - - SingleRowSet expected = fixture.rowSetBuilder(disjointSchema) - .addRow(50, "dino", "supporting") - .addRow(60, "barney", "main") - .build(); - new RowSetComparison(expected) - .verifyAndClearAll(fixture.wrap(projector.output())); - } - - projector.close(); - } - - // TODO: Test schema smoothing with repeated - // TODO: Test hard schema change - // TODO: Typed null column tests (resurrect) - // TODO: Test maps and arrays of maps -} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java index b45374be2..086da96b4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java @@ -24,14 +24,21 @@ import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.file.FileMetadata; +import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn; +import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumnDefn; +import org.apache.drill.exec.physical.impl.scan.file.PartitionColumn; import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec; import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.hadoop.fs.Path; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -114,4 +121,42 @@ public class TestConstantColumnLoader extends SubOperatorTest { .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); staticLoader.close(); } + + @Test + public void testFileMetadata() { + + FileMetadata fileInfo = new FileMetadata(new Path("hdfs:///w/x/y/z.csv"), new Path("hdfs:///w")); + List<ConstantColumnSpec> defns = new ArrayList<>(); + FileMetadataColumnDefn iDefn = new FileMetadataColumnDefn( + ScanTestUtils.SUFFIX_COL, ImplicitFileColumns.SUFFIX); + FileMetadataColumn iCol = new FileMetadataColumn(ScanTestUtils.SUFFIX_COL, + iDefn, fileInfo, null, 0); + defns.add(iCol); + + String partColName = ScanTestUtils.partitionColName(1); + PartitionColumn pCol = new PartitionColumn(partColName, 1, fileInfo, null, 0); + defns.add(pCol); + + ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns); + + // Create a batch + + staticLoader.load(2); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add(ScanTestUtils.SUFFIX_COL, MinorType.VARCHAR) + .addNullable(partColName, MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("csv", "y") + .addRow("csv", "y") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); + staticLoader.close(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java index 5b49ab3c7..f40e84787 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.project; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,6 +28,8 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.physical.rowSet.project.ImpliedTupleRequest; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; import org.apache.drill.exec.record.metadata.ProjectionType; import org.apache.drill.test.SubOperatorTest; @@ -56,6 +59,7 @@ public class TestScanLevelProjection extends SubOperatorTest { final ScanLevelProjection scanProj = new ScanLevelProjection( RowSetTestUtils.projectList("a", "b", "c"), ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); assertFalse(scanProj.projectNone()); @@ -72,6 +76,19 @@ public class TestScanLevelProjection extends SubOperatorTest { // Verify column type assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(3, outputProj.projections().size()); + assertNotNull(outputProj.get("a")); + assertTrue(outputProj.get("a").isSimple()); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertEquals(3, readerProj.projections().size()); + assertNotNull(readerProj.get("a")); + assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a")); + assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d")); } /** @@ -85,6 +102,7 @@ public class TestScanLevelProjection extends SubOperatorTest { final ScanLevelProjection scanProj = new ScanLevelProjection( RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"), ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); assertFalse(scanProj.projectNone()); @@ -107,6 +125,20 @@ public class TestScanLevelProjection extends SubOperatorTest { final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element(); assertTrue(c.isSimple()); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(3, outputProj.projections().size()); + assertNotNull(outputProj.get("a")); + assertTrue(outputProj.get("a").isTuple()); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertEquals(3, readerProj.projections().size()); + assertNotNull(readerProj.get("a")); + assertEquals(ProjectionType.TUPLE, readerProj.projectionType("a")); + assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c")); + assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("d")); } /** @@ -119,6 +151,7 @@ public class TestScanLevelProjection extends SubOperatorTest { final ScanLevelProjection scanProj = new ScanLevelProjection( RowSetTestUtils.projectList("a[1]", "a[3]"), ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); assertFalse(scanProj.projectNone()); @@ -137,6 +170,19 @@ public class TestScanLevelProjection extends SubOperatorTest { assertTrue(a.hasIndex(1)); assertFalse(a.hasIndex(2)); assertTrue(a.hasIndex(3)); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(1, outputProj.projections().size()); + assertNotNull(outputProj.get("a")); + assertTrue(outputProj.get("a").isArray()); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertEquals(1, readerProj.projections().size()); + assertNotNull(readerProj.get("a")); + assertEquals(ProjectionType.ARRAY, readerProj.projectionType("a")); + assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("c")); } /** @@ -165,6 +211,17 @@ public class TestScanLevelProjection extends SubOperatorTest { // Verify column type assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType()); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(1, outputProj.projections().size()); + assertNotNull(outputProj.get("**")); + assertTrue(outputProj.get("**").isWildcard()); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertTrue(readerProj instanceof ImpliedTupleRequest); + assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a")); } /** @@ -181,38 +238,62 @@ public class TestScanLevelProjection extends SubOperatorTest { assertFalse(scanProj.projectAll()); assertTrue(scanProj.projectNone()); assertEquals(0, scanProj.requestedCols().size()); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(0, outputProj.projections().size()); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertTrue(readerProj instanceof ImpliedTupleRequest); + assertEquals(ProjectionType.UNPROJECTED, readerProj.projectionType("a")); } /** - * Can't include both a wildcard and a column name. + * Can include both a wildcard and a column name. The Project + * operator will fill in the column, the scan framework just ignores + * the extra column. */ @Test - public void testErrorWildcardAndColumns() { - try { - new ScanLevelProjection( + public void testWildcardAndColumns() { + ScanLevelProjection scanProj = new ScanLevelProjection( RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"), ScanTestUtils.parsers()); - fail(); - } catch (final IllegalArgumentException e) { - // Expected - } + + assertTrue(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + assertEquals(2, scanProj.requestedCols().size()); + assertEquals(1, scanProj.columns().size()); + + // Verify tuple projection + + RequestedTuple outputProj = scanProj.rootProjection(); + assertEquals(2, outputProj.projections().size()); + assertNotNull(outputProj.get("**")); + assertTrue(outputProj.get("**").isWildcard()); + assertNotNull(outputProj.get("a")); + + RequestedTuple readerProj = scanProj.readerProjection(); + assertTrue(readerProj instanceof ImpliedTupleRequest); + assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("a")); + assertEquals(ProjectionType.UNSPECIFIED, readerProj.projectionType("c")); } /** - * Can't include both a column name and a wildcard. + * Test a column name and a wildcard. */ @Test - public void testErrorColumnAndWildcard() { - try { - new ScanLevelProjection( + public void testColumnAndWildcard() { + ScanLevelProjection scanProj = new ScanLevelProjection( RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR), ScanTestUtils.parsers()); - fail(); - } catch (final IllegalArgumentException e) { - // Expected - } + + assertTrue(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + assertEquals(2, scanProj.requestedCols().size()); + assertEquals(1, scanProj.columns().size()); } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java index a21b1e472..8adc0372a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java @@ -28,8 +28,9 @@ import org.apache.drill.categories.RowSetTests; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn; +import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager; import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; @@ -38,8 +39,10 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.test.SubOperatorTest; import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.hadoop.fs.Path; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** * Tests schema smoothing at the schema projection level. @@ -63,7 +66,7 @@ import org.junit.experimental.categories.Category; * to a fundamental limitation in Drill: * <ul> * <li>Drill cannot predict the future: each file (or batch) - * may have a different schema.</ul> + * may have a different schema.</li> * <li>Drill does not know about these differences until they * occur.</li> * <li>The scan operator is obligated to return the same schema @@ -85,6 +88,105 @@ import org.junit.experimental.categories.Category; public class TestSchemaSmoothing extends SubOperatorTest { /** + * Sanity test for the simple, discrete case. The purpose of + * discrete is just to run the basic lifecycle in a way that + * is compatible with the schema-persistence version. + */ + + @Test + public void testDiscrete() { + + // Set up the file metadata manager + + Path filePathA = new Path("hdfs:///w/x/y/a.csv"); + Path filePathB = new Path("hdfs:///w/x/y/b.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + new Path("hdfs:///w"), + Lists.newArrayList(filePathA, filePathB)); + + // Set up the scan level projection + + ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"), + ScanTestUtils.parsers(metadataManager.projectionParser())); + + { + // Define a file a.csv + + metadataManager.startFile(filePathA); + + // Build the output schema from the (a, b) table schema + + TupleMetadata twoColSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .buildSchema(); + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, twoColSchema, rootTuple, + ScanTestUtils.resolvers(metadataManager)); + + // Verify the full output schema + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("filename", MinorType.VARCHAR) + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .buildSchema(); + + // Verify + + List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name()); + assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value()); + assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType()); + } + { + // Define a file b.csv + + metadataManager.startFile(filePathB); + + // Build the output schema from the (a) table schema + + TupleMetadata oneColSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, oneColSchema, rootTuple, + ScanTestUtils.resolvers(metadataManager)); + + // Verify the full output schema + // Since this mode is "discrete", we don't remember the type + // of the missing column. (Instead, it is filled in at the + // vector level as part of vector persistence.) During projection, it is + // marked with type NULL so that the null column builder will fill in + // the proper type. + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("filename", MinorType.VARCHAR) + .add("a", MinorType.INT) + .addNullable("b", MinorType.NULL) + .buildSchema(); + + // Verify + + List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name()); + assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value()); + assertEquals(ResolvedTableColumn.ID, columns.get(1).nodeType()); + assertEquals(ResolvedNullColumn.ID, columns.get(2).nodeType()); + } + } + + /** * Low-level test of the smoothing projection, including the exceptions * it throws when things are not going its way. */ @@ -463,6 +565,150 @@ public class TestSchemaSmoothing extends SubOperatorTest { assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); } } + + /** + * If using the legacy wildcard expansion, reuse schema if partition paths + * are the same length. + */ + + @Test + public void testSamePartitionLength() { + + // Set up the file metadata manager + + Path filePathA = new Path("hdfs:///w/x/y/a.csv"); + Path filePathB = new Path("hdfs:///w/x/y/b.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + new Path("hdfs:///w"), + Lists.newArrayList(filePathA, filePathB)); + + // Set up the scan level projection + + ScanLevelProjection scanProj = new ScanLevelProjection( + ScanTestUtils.projectAllWithMetadata(2), + ScanTestUtils.parsers(metadataManager.projectionParser())); + + // Define the schema smoother + + SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers(metadataManager)); + + TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); + { + metadataManager.startFile(filePathA); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + { + metadataManager.startFile(filePathB); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + } + + /** + * If using the legacy wildcard expansion, reuse schema if the new partition path + * is shorter than the previous. (Unneeded partitions will be set to null by the + * scan projector.) + */ + + @Test + public void testShorterPartitionLength() { + + // Set up the file metadata manager + + Path filePathA = new Path("hdfs:///w/x/y/a.csv"); + Path filePathB = new Path("hdfs:///w/x/b.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + new Path("hdfs:///w"), + Lists.newArrayList(filePathA, filePathB)); + + // Set up the scan level projection + + ScanLevelProjection scanProj = new ScanLevelProjection( + ScanTestUtils.projectAllWithMetadata(2), + ScanTestUtils.parsers(metadataManager.projectionParser())); + + // Define the schema smoother + + SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers(metadataManager)); + + TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); + { + metadataManager.startFile(filePathA); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + { + metadataManager.startFile(filePathB); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + } + + /** + * If using the legacy wildcard expansion, we are able to use the same + * schema even if the new partition path is longer than the previous. + * Because all file names are provided up front. + */ + + @Test + public void testLongerPartitionLength() { + + // Set up the file metadata manager + + Path filePathA = new Path("hdfs:///w/x/a.csv"); + Path filePathB = new Path("hdfs:///w/x/y/b.csv"); + FileMetadataManager metadataManager = new FileMetadataManager( + fixture.getOptionManager(), + new Path("hdfs:///w"), + Lists.newArrayList(filePathA, filePathB)); + + // Set up the scan level projection + + ScanLevelProjection scanProj = new ScanLevelProjection( + ScanTestUtils.projectAllWithMetadata(2), + ScanTestUtils.parsers(metadataManager.projectionParser())); + + // Define the schema smoother + + SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers(metadataManager)); + + TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2); + { + metadataManager.startFile(filePathA); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + { + metadataManager.startFile(filePathB); + ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema)); + } + } + /** * Integrated test across multiple schemas at the batch level. */ |