diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
22 files changed, 831 insertions, 323 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java index bbf12d4b8..b366d34d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java @@ -238,8 +238,11 @@ class ReaderState { /** * Prepare the schema for this reader. Called for the first reader within a - * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. If - * this is an early-schema reader, then the result set loader already has + * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. + * Asks the reader if it can provide a schema-only empty batch by calling + * the reader's <tt>defineSchema()</tt> method. If this is an early-schema + * reader, and it can provide a schema, then it should create an empty + * batch so that the the result set loader already has * the proper value vectors set up. If this is a late-schema reader, we must * read one batch to get the schema, then set aside the data for the next * call to <tt>next()</tt>. @@ -255,9 +258,10 @@ class ReaderState { * <li>If if turned out that the file was * empty when trying to read the schema, <tt>open()</tt> returned false * and this method should never be called.</tt> - * <li>Otherwise, if a schema was available, then the schema is already - * set up in the result set loader as the result of schema negotiation, and - * this method simply returns <tt>true</tt>. + * <li>Otherwise, the reader does not know if it is the first reader or + * not. The call to <tt>defineSchema()</tt> notifies the reader that it + * is the first one. The reader should set up in the result set loader + * with an empty batch. * </ul> * <p> * Semantics for late-schema readers: @@ -280,14 +284,12 @@ class ReaderState { protected boolean buildSchema() { - VectorContainer container = reader.output(); - - if (container != null) { + if (reader.defineSchema()) { // Bind the output container to the output of the scan operator. // This returns an empty batch with the schema filled in. - scanOp.containerAccessor.setContainer(container); + scanOp.containerAccessor.setContainer(reader.output()); schemaVersion = reader.schemaVersion(); return true; } @@ -297,7 +299,8 @@ class ReaderState { if (! next()) { return false; } - container = reader.output(); + VectorContainer container = reader.output(); + schemaVersion = reader.schemaVersion(); if (container.getRecordCount() == 0) { return true; } @@ -374,8 +377,7 @@ class ReaderState { private boolean readBatch() { - // Try to read a batch. This may fail. If so, clean up the - // mess. + // Try to read a batch. This may fail. If so, clean up the mess. boolean more; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java index c0985b538..61de584f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java @@ -84,8 +84,6 @@ import org.apache.drill.exec.record.VectorContainer; public interface RowBatchReader { - enum Result { OK, LAST_BATCH, EOF } - /** * Name used when reporting errors. Can simply be the class name. * @@ -111,6 +109,22 @@ public interface RowBatchReader { boolean open(); /** + * Called for the first reader within a scan. Allows the reader to + * provide an empty batch with only the schema filled in. Readers that + * are "early schema" (know the schema up front) should return true + * and create an empty batch. Readers that are "late schema" should + * return false. In that case, the scan operator will ask the reader + * to load an actual data batch, and infer the schema from that batch. + * <p> + * This step is optional and is purely for performance. + * + * @return true if this reader can (and has) defined an empty batch + * to describe the schema, false otherwise + */ + + boolean defineSchema(); + + /** * Read the next batch. Reading continues until either EOF, * or until the mutator indicates that the batch is full. * The batch is considered valid if it is non-empty. Returning @@ -129,7 +143,7 @@ public interface RowBatchReader { * <tt>next()</tt> should be called again, <tt>false</tt> to indicate * that EOF was reached * - * @throws RutimeException (<tt>UserException</tt> preferred) if an + * @throws RuntimeException (<tt>UserException</tt> preferred) if an * error occurs that should fail the query. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java index 9e174146f..04b2c7eb3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java @@ -20,15 +20,23 @@ package org.apache.drill.exec.physical.impl.scan; import org.apache.drill.exec.ops.OperatorContext; /** - * Interface to the set of readers, and reader schema, that the - * scan operator manages. The reader factory creates and returns - * the readers to use for the scan, as determined by the specific - * physical plan. The reader factory also - * translates from the select list provided - * in the physical plan to the actual columns returned from the - * scan operator. The translation is reader-specific; this - * interface allows the scan operator to trigger various - * lifecycle events. + * Interface to the set of readers, and reader schema, that the scan operator + * manages. The reader factory creates and returns the readers to use for the + * scan, as determined by the specific physical plan. The reader factory also + * translates from the select list provided in the physical plan to the actual + * columns returned from the scan operator. The translation is reader-specific; + * this interface allows the scan operator to trigger various lifecycle events. + * <p> + * This interface decouples the scan implementation from the generic tasks + * needed to implement Drill's Volcano iterator protocol for operators, and + * Drill's schema and batch semantics. A scan implementation need only + * implement this interface to add plugin-specific scan behavior. + * <p> + * While this interface allows a wide variety of implementations, the intent is + * that most actual scanners will use the "managed" framework that handles the + * routine projection, vector management and other tasks that tend to be common + * across scanners. See {@link ScanSchemaOrchestrator} for the managed + * framework. */ public interface ScanOperatorEvents { @@ -46,11 +54,25 @@ public interface ScanOperatorEvents { void bind(OperatorContext context); + /** + * A scanner typically readers multiple data sources (such as files or + * file blocks.) A batch reader handles each read. This method returns + * the next reader in whatever sequence that this scan defines. + * <p> + * The preferred implementation is to create each batch reader in this + * call to minimize resource usage. Production queries may read + * thousands of files or blocks, so incremental reader creation can be + * far more efficient than creating readers at the start of the scan. + * + * @return a batch reader for one of the scan elements within the + * scan physical plan for this scan operator + */ + RowBatchReader nextReader(); /** * Called when the scan operator itself is closed. Indicates that no more - * readers are available (or will be opened). + * readers are available. */ void close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java index f6c72b120..966a03901 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java @@ -35,14 +35,24 @@ import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput; * expands to `columns`.</li> * <li>If the columns array appears, then no other table columns * can appear.</li> - * <li>If the columns array appears, then the wildcard cannot also - * appear, unless that wildcard expanded to be `columns` as - * described above.</li> + * <li>Both 'columns' and the wildcard can appear for queries such + * as:<code><pre> + * select * from dfs.`multilevel/csv` + * where columns[1] < 1000</pre> + * </code></li> * <li>The query can select specific elements such as `columns`[2]. * In this case, only array elements can appear, not the unindexed * `columns` column.</li> + * <li>If is possible for `columns` to appear twice. In this case, + * the project operator will make a copy.</li> * </ul> * <p> + * To handle these cases, the general rule is: allow any number + * of wildcard or `columns` appearances in the input projection, but + * collapse them all down to a single occurrence of `columns` in the + * output projection. (Upstream code will prevent `columns` from + * appearing twice in its non-indexed form.) + * <p> * It falls to this parser to detect a not-uncommon user error, a * query such as the following:<pre><code> * SELECT max(columns[1]) AS col1 @@ -83,7 +93,8 @@ public class ColumnsArrayParser implements ScanProjectionParser { @Override public boolean parse(RequestedColumn inCol) { if (requireColumnsArray && inCol.isWildcard()) { - expandWildcard(); + createColumnsCol( + new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL)); return true; } if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) { @@ -113,41 +124,24 @@ public class ColumnsArrayParser implements ScanProjectionParser { .build(logger); } } - - // Special `columns` array column. - - columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol); - builder.addTableColumn(columnsArrayCol); + createColumnsCol(inCol); return true; } - /** - * Query contained SELECT *, and we know that the reader supports only - * the `columns` array; go ahead and expand the wildcard to the only - * possible column. - */ + private void createColumnsCol(RequestedColumn inCol) { + + // Special `columns` array column. Allow multiple, but + // project only one. - private void expandWildcard() { if (columnsArrayCol != null) { - throw UserException - .validationError() - .message("Cannot select columns[] and `*` together") - .build(logger); + return; } - columnsArrayCol = new UnresolvedColumnsArrayColumn( - new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL)); + columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol); builder.addTableColumn(columnsArrayCol); } @Override - public void validate() { - if (builder.hasWildcard() && columnsArrayCol != null) { - throw UserException - .validationError() - .message("Cannot select `columns` and `*` together") - .build(logger); - } - } + public void validate() { } @Override public void validateColumn(ColumnProjection col) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java index f9674dc2a..ae8502b52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.scan.file; +import java.util.HashSet; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,6 +40,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { private final FileMetadataManager metadataManager; private final Pattern partitionPattern; private ScanLevelProjection builder; + private final Set<Integer> referencedPartitions = new HashSet<>(); // Output @@ -64,6 +67,11 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { if (defn != null) { return buildMetadataColumn(defn, inCol); } + if (inCol.isWildcard()) { + buildWildcard(); + + // Don't consider this a match. + } return false; } @@ -80,11 +88,18 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { // Partition column - builder.addMetadataColumn( - new PartitionColumn( - inCol.name(), - Integer.parseInt(m.group(1)))); - hasImplicitCols = true; + int partitionIndex = Integer.parseInt(m.group(1)); + if (! referencedPartitions.contains(partitionIndex)) { + builder.addMetadataColumn( + new PartitionColumn( + inCol.name(), + partitionIndex)); + + // Remember the partition for later wildcard expansion + + referencedPartitions.add(partitionIndex); + hasImplicitCols = true; + } return true; } @@ -107,8 +122,52 @@ public class FileMetadataColumnsParser implements ScanProjectionParser { return true; } + private void buildWildcard() { + if (metadataManager.useLegacyWildcardExpansion && + metadataManager.useLegacyExpansionLocation) { + + // Star column: this is a SELECT * query. + + // Old-style wildcard handling inserts all partition columns in + // the scanner, removes them in Project. + // Fill in the file metadata columns. Can do here because the + // set is constant across all files. + + expandPartitions(); + } + } + @Override - public void validate() { } + public void validate() { + + // Expand partitions if using a wildcard appears, if using the + // feature to expand partitions for wildcards, and we want the + // partitions after data columns. + + if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion && + ! metadataManager.useLegacyExpansionLocation) { + expandPartitions(); + } + } + + private void expandPartitions() { + + // Legacy wildcard expansion: include the file partitions for this file. + // This is a disadvantage for a * query: files at different directory + // levels will have different numbers of columns. Would be better to + // return this data as an array at some point. + // Append this after the *, keeping the * for later expansion. + + for (int i = 0; i < metadataManager.partitionCount(); i++) { + if (referencedPartitions.contains(i)) { + continue; + } + builder.addMetadataColumn(new PartitionColumn( + metadataManager.partitionName(i), i)); + referencedPartitions.add(i); + } + hasImplicitCols = true; + } @Override public void validateColumn(ColumnProjection outCol) { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java index fe4332a34..ba49a9f54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java @@ -53,6 +53,31 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes protected final String partitionDesignator; protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>(); protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap(); + + /** + * Indicates whether to expand partition columns when the query contains a wildcard. + * Supports queries such as the following:<code><pre> + * select * from dfs.`partitioned-dir` + * </pre><code> + * In which the output columns will be (columns, dir0) if the partitioned directory + * has one level of nesting. + * + * See {@link TestImplicitFileColumns#testImplicitColumns} + */ + protected final boolean useLegacyWildcardExpansion; + + /** + * In legacy mode, above, Drill expands partition columns whenever the + * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after + * data columns. This is actually a better position as it minimizes changes + * the row layout for files at different depths. Drill 1.12 moved them before + * data columns: at the location of the wildcard. + * <p> + * This flag, when set, uses the Drill 1.12 position. Later enhancements + * can unset this flag to go back to the future: use the preferred location + * after other columns. + */ + protected final boolean useLegacyExpansionLocation; private final FileMetadataColumnsParser parser; // Internal state @@ -84,7 +109,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes */ public FileMetadataManager(OptionSet optionManager, + boolean useLegacyWildcardExpansion, + boolean useLegacyExpansionLocation, Path rootDir, List<Path> files) { + this.useLegacyWildcardExpansion = useLegacyWildcardExpansion; + this.useLegacyExpansionLocation = useLegacyExpansionLocation; scanRootDir = rootDir; partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); @@ -117,6 +146,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes } } + public FileMetadataManager(OptionSet optionManager, + Path rootDir, List<Path> files) { + this(optionManager, false, false, rootDir, files); + } + private int computeMaxPartition(List<Path> files) { int maxLen = 0; for (Path filePath : files) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java index 609e9f05f..6ecf0cf05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java @@ -32,19 +32,46 @@ import org.apache.hadoop.mapred.FileSplit; /** * The file scan framework adds into the scan framework support for implicit - * file metadata columns. + * file metadata columns. The file scan framework brings together a number of + * components: + * <ul> + * <li>The projection list provided by the physical operator definition. This + * list identifies the set of "output" columns whih this framework is obliged + * to produce.</li> + * <li>The set of files and/or blocks to read.</li> + * <li>The file system configuration to use for working with the files + * or blocks.</li> + * <li>The factory class to create a reader for each of the files or blocks + * defined above. (Readers are created one-by-one as files are read.)</li> + * <li>Options as defined by the base class.</li> + * </ul> + * <p> + * @See {AbstractScanFramework} for details. */ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiator> { - public interface FileReaderCreator { + /** + * Creates a batch reader on demand. Unlike earlier versions of Drill, + * this framework creates readers one by one, when they are needed. + * Doing so avoids excessive resource demands that come from creating + * potentially thousands of readers up front. + * <p> + * The reader itself is unique to each file type. This interface + * provides a common interface that this framework can use to create the + * file-specific reader on demand. + */ + + public interface FileReaderFactory { ManagedReader<FileSchemaNegotiator> makeBatchReader( DrillFileSystem dfs, FileSplit split) throws ExecutionSetupException; } /** - * Implementation of the file-level schema negotiator. + * Implementation of the file-level schema negotiator. At present, no + * file-specific features exist. This class shows, however, where we would + * add such features. */ public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl @@ -55,12 +82,12 @@ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiato } } - private final FileReaderCreator readerCreator; + private final FileReaderFactory readerCreator; public FileScanFramework(List<SchemaPath> projection, List<? extends FileWork> files, Configuration fsConf, - FileReaderCreator readerCreator) { + FileReaderFactory readerCreator) { super(projection, files, fsConf); this.readerCreator = readerCreator; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java index 4a15ff787..bf132651b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java @@ -44,4 +44,17 @@ public abstract class MetadataColumn extends ResolvedColumn implements ConstantC public String name() { return schema.getName(); } public abstract MetadataColumn resolve(FileMetadata fileInfo, VectorSource source, int sourceIndex); + + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" schema=\"") + .append(schema.toString()) + .append(", value=") + .append(value) + .append("]") + .toString(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java index 54079013a..d285261c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java @@ -40,15 +40,91 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; * config options, and implements the matching "managed reader". All details * of setup, projection, and so on are handled by the framework and the components * that the framework builds upon. + * + * <h4>Inputs</h4> + * + * At this basic level, a scan framework requires just a few simple inputs: + * <ul> + * <li>The projection list provided by the physical operator definition. This + * list identifies the set of "output" columns whih this framework is obliged + * to produce.</li> + * <li>The operator context which provides access to a memory allocator and + * other plumbing items.</li> + * <li>A method to create a reader for each of the files or blocks + * defined above. (Readers are created one-by-one as files are read.)</li> + * <li>The data type to use for projected columns which the reader cannot + * provide. (Drill allows such columns and fills in null values: traditionally + * nullable Int, but customizable here.) + * <li>Various other options.</li> + * </ul> + * + * <h4>Orchestration</h4> + * + * The above is sufficient to drive the entire scan operator functionality. + * Projection is done generically and is the same for all files. Only the + * reader (created via the factory class) differs from one type of file to + * another. + * <p> + * The framework achieves the work described below= by composing a large + * set of detailed classes, each of which performs some specific task. This + * structure leaves the reader to simply infer schema and read data. + * <p> + * In particular, rather than do all the orchestration here (which would tie + * that logic to the scan operation), the detailed work is delegated to the + * {@link ScanSchemaOrchestrator} class, with this class as a "shim" between + * the the Scan events API and the schema orchestrator implementation. + * + * <h4>Reader Integration</h4> + * + * The details of how a file is structured, how a schema is inferred, how + * data is decoded: all that is encapsulated in the reader. The only real + * Interaction between the reader and the framework is: + * <ul> + * <li>The reader "negotiates" a schema with the framework. The framework + * knows the projection list from the query plan, knows something about + * data types (whether a column should be scalar, a map or an array), and + * knows about the schema already defined by prior readers. The reader knows + * what schema it can produce (if "early schema.") The schema negotiator + * class handles this task.</li> + * <li>The reader reads data from the file and populates value vectors a + * batch at a time. The framework creates the result set loader to use for + * this work. The schema negotiator returns that loader to the reader, which + * uses it during read. + * <p> + * It is important to note that the result set loader also defines a schema: + * the schema requested by the reader. If the reader wants to read three + * columns, a, b, and c, then that is the schema that the result set loader + * supports. This is true even if the query plan only wants column a, or + * wants columns c, a. The framework handles the projection task so the + * reader does not have to worry about it. Reading an unwanted column + * is low cost: the result set loader will have provided a "dummy" column + * writer that simply discards the value. This is just as fast as having the + * reader use if-statements or a table to determine which columns to save. + * <p> + * A reader may be "late schema", true "schema on read." In this case, the + * reader simply tells the result set loader to create a new column reader + * on the fly. The framework will work out if that new column is to be + * projected and will return either a real column writer (projected column) + * or a dummy column writer (unprojected column.)</li> + * <li>The reader then reads batches of data until all data is read. The + * result set loader signals when a batch is full; the reader should not + * worry about this detail itself.</li> + * <li>The reader then releases its resources.</li> + * </ul> */ public abstract class AbstractScanFramework<T extends SchemaNegotiator> implements ScanOperatorEvents { + // Inputs + protected final List<SchemaPath> projection; protected MajorType nullType; protected int maxBatchRowCount; protected int maxBatchByteCount; protected OperatorContext context; + + // Internal state + protected ScanSchemaOrchestrator scanOrchestrator; public AbstractScanFramework(List<SchemaPath> projection) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java index 631881265..dead9cb46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java @@ -65,9 +65,13 @@ public interface SchemaNegotiator { * columns during the read. * * @param schema the table schema if known at open time + * @param isComplete true if the schema is complete: if it can be used + * to define an empty schema-only batch for the first reader. Set to + * false if the schema is partial: if the reader must read rows to + * determine the full schema */ - void setTableSchema(TupleMetadata schema); + void setTableSchema(TupleMetadata schema, boolean isComplete); /** * Set the preferred batch size (which may be overridden by the diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java index 46f363dd5..0841049fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java @@ -53,6 +53,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { protected final AbstractScanFramework<?> basicFramework; private final ShimBatchReader<? extends SchemaNegotiator> shim; protected TupleMetadata tableSchema; + protected boolean isSchemaComplete; protected int batchSize = ValueVector.MAX_ROW_COUNT; public SchemaNegotiatorImpl(AbstractScanFramework<?> framework, ShimBatchReader<? extends SchemaNegotiator> shim) { @@ -66,8 +67,9 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { } @Override - public void setTableSchema(TupleMetadata schema) { + public void setTableSchema(TupleMetadata schema, boolean isComplete) { tableSchema = schema; + this.isSchemaComplete = schema != null && isComplete; } @Override @@ -97,4 +99,6 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator { public boolean isProjectionEmpty() { return basicFramework.scanOrchestrator().isProjectNone(); } + + public boolean isSchemaComplete() { return isSchemaComplete; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java index 0dc3c5736..a97b32968 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.framework; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.physical.impl.scan.RowBatchReader; -import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator; import org.apache.drill.exec.physical.rowSet.ResultSetLoader; import org.apache.drill.exec.record.VectorContainer; @@ -44,6 +44,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead protected final AbstractScanFramework<T> manager; protected final ManagedReader<T> reader; protected final ReaderSchemaOrchestrator readerOrchestrator; + protected SchemaNegotiatorImpl schemaNegotiator; protected ResultSetLoader tableLoader; /** @@ -96,10 +97,19 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead } @Override + public boolean defineSchema() { + if (schemaNegotiator.isSchemaComplete()) { + readerOrchestrator.defineSchema(); + return true; + } + return false; + } + + @Override public boolean next() { // The reader may report EOF, but the result set loader might - // have a lookhead row. + // have a lookahead row. if (eof && ! tableLoader.hasRows()) { return false; @@ -181,6 +191,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead } public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) { + this.schemaNegotiator = schemaNegotiator; readerOrchestrator.setBatchSize(schemaNegotiator.batchSize); tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema); return tableLoader; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java index efd881bb1..096b8447b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java @@ -29,7 +29,7 @@ * is an old version without a new column c, while file B includes the column. * And so on. * <p> - * The scan operator here works to ensure schema continuity as much as + * The scan operator works to ensure schema continuity as much as * possible, smoothing out "soft" schema changes that are simply artifacts of * reading a collection of files. Only "hard" changes (true changes) are * passed downstream. @@ -157,5 +157,29 @@ * output batch in the order specified by the original SELECT list (or table order, * if the original SELECT had a wildcard.) Fortunately, this is just involves * moving around pointers to vectors; no actual data is moved during projection. + * + * <h4>Class Structure</h4> + * + * Some of the key classes here include: + * <ul> + * <li>{@link RowBatchReader} an extremely simple interface for reading data. + * We would like many developers to create new plugins and readers. The simplified + * interface pushes all complexity into the scan framework, leaving the reader to + * just read.</li> + * <li>{@link ShimBatchReader} an implementation of the above that converts from + * the simplified API to add additional structure to work with the result set loader. + * (The base interface is agnostic about how rows are read.)</li> + * <li>{@link ScheamNegotiator} and interface that allows a batch reader to + * "negotiate" a schema with the scan framework. The scan framework knows the + * columns that are to be projected. The reader knows what columns it can offer. + * The schema negotiator works out how to combine the two. It expresses the result + * as a result set loader. Column writers are defined for all columns that the + * reader wants to read, but only the materialized (projected) columns have actual + * vectors behind them. The non-projected columns are "free-wheeling" "dummy" + * writers. + * </li> + * + * And, yes, sorry for the terminology. File "readers" read from files, but + * use column "writers" to write to value vectors. */ package org.apache.drill.exec.physical.impl.scan.framework; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java index 3e302a1e8..d7de30ad6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java @@ -36,9 +36,90 @@ * <p> * See {@link ScanOperatorExec} for details of the scan operator protocol * and components. + * + * <h4>Traditional Class Structure<h4> + * The original design was simple: but required each reader to handle many + * detailed tasks. + * <pre><code> + * +------------+ +-----------+ + * | Scan Batch | +---> | ScanBatch | + * | Creator | | +-----------+ + * +------------+ | | + * | | | + * v | | + * +------------+ | v + * | Format | ---+ +---------------+ + * | Plugin | -----> | Record Reader | + * +------------+ +---------------+ + * + * </code></pre> + * + * The scan batch creator is unique to each storage plugin and is created + * based on the physical operator configuration ("pop config"). The + * scan batch creator delegates to the format plugin to create both the + * scan batch (the scan operator) and the set of readers which the scan + * batch will manage. + * <p> + * The scan batch + * provides a <code>Mutator</code> that creates the vectors used by the + * record readers. Schema continuity comes from reusing the Mutator from one + * file/block to the next. + * <p> + * One characteristic of this system is that all the record readers are + * created up front. If we must read 1000 blocks, we'll create 1000 record + * readers. Developers must be very careful to only allocate resources when + * the reader is opened, and release resources when the reader is closed. + * Else, resource bloat becomes a large problem. + * + * <h4>Revised Class Structure</h4> + * + * The new design is more complex because it divides tasks up into separate + * classes. The class structure is larger, but each class is smaller, more + * focused and does just one task. + * <pre><code> + * +------------+ +---------------+ + * | Scan Batch | -------> | Format Plugin | + * | Creator | +---------------+ + * +------------+ / | \ + * / | \ + * +---------------------+ | \ +---------------+ + * | OperatorRecordBatch | | +---->| ScanFramework | + * +---------------------+ | | +---------------+ + * v | | + * +------------------+ | + * | ScanOperatorExec | | + * +------------------+ v + * | +--------------+ + * +----------> | Batch Reader | + * +--------------+ + * </code></pre> + * + * Here, the scan batch creator again delegates to the format plugin. The + * format plugin creates three objects: + * <ul> + * <li>The <code>OperatorRecordBatch</code>, which encapsulates the Volcano + * iterator protocol. It also holds onto the output batch. This allows the + * operator implementation to just focus on its specific job.</li> + * <li>The <code>ScanOperatorExec</code> is the operator implementation for + * the new result-set-loader based scan.</li> + * <li>The scan framework is specific to each kind of reader. It handles + * everything which is unique to that reader. Rather than inheriting from + * the scan itself, the framework follows the strategy pattern: it says how + * to do a scan for the target format.<li> + * </ul> + * + * The overall structure uses the "composition" pattern: what is combined + * into a small set of classes in the traditional model is broken out into + * focused classes in the revised model. + * <p> + * A key part of the scan strategy is the batch reader. ("Batch" because + * it reads an entire batch at a time, using the result set loader.) The + * framework creates batch readers one by one as needed. Resource bloat + * is less of an issue because only one batch reader instance exists at + * any time for each scan operator instance. * <p> - * See the "managed" package for a reusable framework for handling the - * details of batches, schema and so on. + * Each of the above is further broken down into additional classes to + * handle projection and so on. */ package org.apache.drill.exec.physical.impl.scan; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java index 41cc59582..c0bcfa3b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; */ public class ExplicitSchemaProjection extends SchemaLevelProjection { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class); public ExplicitSchemaProjection(ScanLevelProjection scanProj, TupleMetadata tableSchema, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java new file mode 100644 index 000000000..029b6a005 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Orchestrates projection tasks for a single reader within the set that the + * scan operator manages. Vectors are reused across readers, but via a vector + * cache. All other state is distinct between readers. + */ + +public class ReaderSchemaOrchestrator implements VectorSource { + + private final ScanSchemaOrchestrator scanOrchestrator; + private int readerBatchSize; + private ResultSetLoaderImpl tableLoader; + private int prevTableSchemaVersion = -1; + + /** + * Assembles the table, metadata and null columns into the final output + * batch to be sent downstream. The key goal of this class is to "smooth" + * schema changes in this output batch by absorbing trivial schema changes + * that occur across readers. + */ + + private ResolvedRow rootTuple; + private VectorContainer tableContainer; + + public ReaderSchemaOrchestrator(ScanSchemaOrchestrator scanSchemaOrchestrator) { + scanOrchestrator = scanSchemaOrchestrator; + readerBatchSize = scanOrchestrator.scanBatchRecordLimit; + } + + public void setBatchSize(int size) { + if (size > 0) { + readerBatchSize = Math.min(size, scanOrchestrator.scanBatchRecordLimit); + } + } + + public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) { + OptionBuilder options = new OptionBuilder(); + options.setRowCountLimit(readerBatchSize); + options.setVectorCache(scanOrchestrator.vectorCache); + options.setBatchSizeLimit(scanOrchestrator.scanBatchByteLimit); + + // Set up a selection list if available and is a subset of + // table columns. (Only needed for non-wildcard queries.) + // The projection list includes all candidate table columns + // whether or not they exist in the up-front schema. Handles + // the odd case where the reader claims a fixed schema, but + // adds a column later. + + if (! scanOrchestrator.scanProj.projectAll()) { + options.setProjectionSet(scanOrchestrator.scanProj.readerProjection()); + } + options.setSchema(tableSchema); + + // Create the table loader + + tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build()); + return tableLoader; + } + + public boolean hasSchema() { + return prevTableSchemaVersion >= 0; + } + + public void defineSchema() { + tableLoader.startEmptyBatch(); + endBatch(); + } + + public void startBatch() { + tableLoader.startBatch(); + } + + /** + * Build the final output batch by projecting columns from the three input sources + * to the output batch. First, build the metadata and/or null columns for the + * table row count. Then, merge the sources. + */ + + public void endBatch() { + + // Get the batch results in a container. + + tableContainer = tableLoader.harvest(); + + // If the schema changed, set up the final projection based on + // the new (or first) schema. + + if (prevTableSchemaVersion < tableLoader.schemaVersion()) { + reviseOutputProjection(); + } else { + + // Fill in the null and metadata columns. + + populateNonDataColumns(); + } + rootTuple.setRowCount(tableContainer.getRecordCount()); + } + + private void populateNonDataColumns() { + int rowCount = tableContainer.getRecordCount(); + scanOrchestrator.metadataManager.load(rowCount); + rootTuple.loadNulls(rowCount); + } + + /** + * Create the list of null columns by comparing the SELECT list against the + * columns available in the batch schema. Create null columns for those that + * are missing. This is done for the first batch, and any time the schema + * changes. (For early-schema, the projection occurs once as the schema is set + * up-front and does not change.) For a SELECT *, the null column check + * only need be done if null columns were created when mapping from a prior + * schema. + */ + + private void reviseOutputProjection() { + + // Do the table-schema level projection; the final matching + // of projected columns to available columns. + + TupleMetadata tableSchema = tableLoader.harvestSchema(); + if (scanOrchestrator.schemaSmoother != null) { + doSmoothedProjection(tableSchema); + } else if (scanOrchestrator.scanProj.hasWildcard()) { + doWildcardProjection(tableSchema); + } else { + doExplicitProjection(tableSchema); + } + + // Combine metadata, nulls and batch data to form the final + // output container. Columns are created by the metadata and null + // loaders only in response to a batch, so create the first batch. + + rootTuple.buildNulls(scanOrchestrator.vectorCache); + scanOrchestrator.metadataManager.define(); + populateNonDataColumns(); + rootTuple.project(tableContainer, scanOrchestrator.outputContainer); + prevTableSchemaVersion = tableLoader.schemaVersion(); + } + + private void doSmoothedProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns)); + scanOrchestrator.schemaSmoother.resolve(tableSchema, rootTuple); + } + + /** + * Query contains a wildcard. The schema-level projection includes + * all columns provided by the reader. + */ + + private void doWildcardProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow(null); + new WildcardSchemaProjection(scanOrchestrator.scanProj, + tableSchema, rootTuple, scanOrchestrator.schemaResolvers); + } + + /** + * Explicit projection: include only those columns actually + * requested by the query, which may mean filling in null + * columns for projected columns that don't actually exist + * in the table. + * + * @param tableSchema newly arrived schema + */ + + private void doExplicitProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns)); + new ExplicitSchemaProjection(scanOrchestrator.scanProj, + tableSchema, rootTuple, + scanOrchestrator.schemaResolvers); + } + + @Override + public ValueVector vector(int index) { + return tableContainer.getValueVector(index).getValueVector(); + } + + public void close() { + RuntimeException ex = null; + try { + if (tableLoader != null) { + tableLoader.close(); + tableLoader = null; + } + } + catch (RuntimeException e) { + ex = e; + } + try { + if (rootTuple != null) { + rootTuple.close(); + rootTuple = null; + } + } + catch (RuntimeException e) { + ex = ex == null ? e : ex; + } + scanOrchestrator.metadataManager.endFile(); + if (ex != null) { + throw ex; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java index 83d40a310..f90f722b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java @@ -100,8 +100,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl; public class ScanLevelProjection { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class); - /** * Interface for add-on parsers, avoids the need to create * a single, tightly-coupled parser for all types of columns. @@ -128,12 +126,26 @@ public class ScanLevelProjection { // Internal state + protected boolean includesWildcard; protected boolean sawWildcard; // Output protected List<ColumnProjection> outputCols = new ArrayList<>(); + + /** + * Projection definition for the scan a whole. Parsed form of the input + * projection list. + */ + protected RequestedTuple outputProjection; + + /** + * Projection definition passed to each reader. This is the set of + * columns that the reader is asked to provide. + */ + + protected RequestedTuple readerProjection; protected boolean hasWildcard; protected boolean emptyProjection = true; @@ -158,6 +170,18 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.bind(this); } + + // First pass: check if a wildcard exists. + + for (RequestedColumn inCol : outputProjection.projections()) { + if (inCol.isWildcard()) { + includesWildcard = true; + break; + } + } + + // Second pass: process remaining columns. + for (RequestedColumn inCol : outputProjection.projections()) { if (inCol.isWildcard()) { mapWildcard(inCol); @@ -169,6 +193,23 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.build(); } + + // Create the reader projection which includes either all columns + // (saw a wildcard) or just the unresolved columns (which excludes + // implicit columns.) + + List<RequestedColumn> outputProj; + if (hasWildcard()) { + outputProj = null; + } else { + outputProj = new ArrayList<>(); + for (ColumnProjection col : outputCols) { + if (col instanceof UnresolvedColumn) { + outputProj.add(((UnresolvedColumn) col).element()); + } + } + } + readerProjection = RequestedTupleImpl.build(outputProj); } /** @@ -181,6 +222,7 @@ public class ScanLevelProjection { // Wildcard column: this is a SELECT * query. + assert includesWildcard; if (sawWildcard) { throw new IllegalArgumentException("Duplicate * entry in project list"); } @@ -245,6 +287,15 @@ public class ScanLevelProjection { } } + // If the project list has a wildcard, and the column is not one recognized + // by the specialized parsers above, then just ignore it. It is likely a duplicate + // column name. In any event, it will be processed by the Project operator on + // top of this scan. + + if (includesWildcard) { + return; + } + // This is a desired table column. addTableColumn( @@ -281,15 +332,6 @@ public class ScanLevelProjection { for (ScanProjectionParser parser : parsers) { parser.validateColumn(outCol); } - switch (outCol.nodeType()) { - case UnresolvedColumn.UNRESOLVED: - if (hasWildcard()) { - throw new IllegalArgumentException("Cannot select table columns and * together"); - } - break; - default: - break; - } } } @@ -333,6 +375,8 @@ public class ScanLevelProjection { public RequestedTuple rootProjection() { return outputProjection; } + public RequestedTuple readerProjection() { return readerProjection; } + @Override public String toString() { return new StringBuilder() diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java index fe78f5a75..a5d6ca2a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java @@ -23,15 +23,10 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; -import org.apache.drill.exec.physical.rowSet.ResultSetLoader; -import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; -import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; /** @@ -154,212 +149,6 @@ public class ScanSchemaOrchestrator { public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE; public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT; - /** - * Orchestrates projection tasks for a single reader with the set that the - * scan operator manages. Vectors are reused across readers, but via a vector - * cache. All other state is distinct between readers. - */ - - public class ReaderSchemaOrchestrator implements VectorSource { - - private int readerBatchSize; - private ResultSetLoaderImpl tableLoader; - private int prevTableSchemaVersion = -1; - - /** - * Assembles the table, metadata and null columns into the final output - * batch to be sent downstream. The key goal of this class is to "smooth" - * schema changes in this output batch by absorbing trivial schema changes - * that occur across readers. - */ - - private ResolvedRow rootTuple; - private VectorContainer tableContainer; - - public ReaderSchemaOrchestrator() { - readerBatchSize = scanBatchRecordLimit; - } - - public void setBatchSize(int size) { - if (size > 0) { - readerBatchSize = Math.min(size, scanBatchRecordLimit); - } - } - - public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) { - OptionBuilder options = new OptionBuilder(); - options.setRowCountLimit(readerBatchSize); - options.setVectorCache(vectorCache); - options.setBatchSizeLimit(scanBatchByteLimit); - - // Set up a selection list if available and is a subset of - // table columns. (Only needed for non-wildcard queries.) - // The projection list includes all candidate table columns - // whether or not they exist in the up-front schema. Handles - // the odd case where the reader claims a fixed schema, but - // adds a column later. - - if (! scanProj.projectAll()) { - options.setProjectionSet(scanProj.rootProjection()); - } - options.setSchema(tableSchema); - - // Create the table loader - - tableLoader = new ResultSetLoaderImpl(allocator, options.build()); - - // If a schema is given, create a zero-row batch to announce the - // schema downstream in the form of an empty batch. - - if (tableSchema != null) { - tableLoader.startEmptyBatch(); - endBatch(); - } - - return tableLoader; - } - - public boolean hasSchema() { - return prevTableSchemaVersion >= 0; - } - - public void startBatch() { - tableLoader.startBatch(); - } - - /** - * Build the final output batch by projecting columns from the three input sources - * to the output batch. First, build the metadata and/or null columns for the - * table row count. Then, merge the sources. - */ - - public void endBatch() { - - // Get the batch results in a container. - - tableContainer = tableLoader.harvest(); - - // If the schema changed, set up the final projection based on - // the new (or first) schema. - - if (prevTableSchemaVersion < tableLoader.schemaVersion()) { - reviseOutputProjection(); - } else { - - // Fill in the null and metadata columns. - - populateNonDataColumns(); - } - rootTuple.setRowCount(tableContainer.getRecordCount()); - } - - private void populateNonDataColumns() { - int rowCount = tableContainer.getRecordCount(); - metadataManager.load(rowCount); - rootTuple.loadNulls(rowCount); - } - - /** - * Create the list of null columns by comparing the SELECT list against the - * columns available in the batch schema. Create null columns for those that - * are missing. This is done for the first batch, and any time the schema - * changes. (For early-schema, the projection occurs once as the schema is set - * up-front and does not change.) For a SELECT *, the null column check - * only need be done if null columns were created when mapping from a prior - * schema. - */ - - private void reviseOutputProjection() { - - // Do the table-schema level projection; the final matching - // of projected columns to available columns. - - TupleMetadata tableSchema = tableLoader.harvestSchema(); - if (schemaSmoother != null) { - doSmoothedProjection(tableSchema); - } else if (scanProj.hasWildcard()) { - doWildcardProjection(tableSchema); - } else { - doExplicitProjection(tableSchema); - } - - // Combine metadata, nulls and batch data to form the final - // output container. Columns are created by the metadata and null - // loaders only in response to a batch, so create the first batch. - - rootTuple.buildNulls(vectorCache); - metadataManager.define(); - populateNonDataColumns(); - rootTuple.project(tableContainer, outputContainer); - prevTableSchemaVersion = tableLoader.schemaVersion(); - } - - private void doSmoothedProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow( - new NullColumnBuilder(nullType, allowRequiredNullColumns)); - schemaSmoother.resolve(tableSchema, rootTuple); - } - - /** - * Query contains a wildcard. The schema-level projection includes - * all columns provided by the reader. - */ - - private void doWildcardProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow(null); - new WildcardSchemaProjection(scanProj, - tableSchema, rootTuple, schemaResolvers); - } - - /** - * Explicit projection: include only those columns actually - * requested by the query, which may mean filling in null - * columns for projected columns that don't actually exist - * in the table. - * - * @param tableSchema newly arrived schema - */ - - private void doExplicitProjection(TupleMetadata tableSchema) { - rootTuple = new ResolvedRow( - new NullColumnBuilder(nullType, allowRequiredNullColumns)); - new ExplicitSchemaProjection(scanProj, - tableSchema, rootTuple, - schemaResolvers); - } - - @Override - public ValueVector vector(int index) { - return tableContainer.getValueVector(index).getValueVector(); - } - - public void close() { - RuntimeException ex = null; - try { - if (tableLoader != null) { - tableLoader.close(); - tableLoader = null; - } - } - catch (RuntimeException e) { - ex = e; - } - try { - if (rootTuple != null) { - rootTuple.close(); - rootTuple = null; - } - } - catch (RuntimeException e) { - ex = ex == null ? e : ex; - } - metadataManager.endFile(); - if (ex != null) { - throw ex; - } - } - } - // Configuration /** @@ -367,16 +156,16 @@ public class ScanSchemaOrchestrator { * not set, the null type is the Drill default. */ - private MajorType nullType; + MajorType nullType; /** * Creates the metadata (file and directory) columns, if needed. */ - private MetadataManager metadataManager; - private final BufferAllocator allocator; - private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; - private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; + MetadataManager metadataManager; + final BufferAllocator allocator; + int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; + int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; private final List<ScanProjectionParser> parsers = new ArrayList<>(); /** @@ -389,7 +178,7 @@ public class ScanSchemaOrchestrator { List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>(); private boolean useSchemaSmoothing; - private boolean allowRequiredNullColumns; + boolean allowRequiredNullColumns; // Internal state @@ -402,14 +191,14 @@ public class ScanSchemaOrchestrator { * vectors rather than vector instances, this cache can be deprecated. */ - private ResultVectorCacheImpl vectorCache; - private ScanLevelProjection scanProj; + ResultVectorCacheImpl vectorCache; + ScanLevelProjection scanProj; private ReaderSchemaOrchestrator currentReader; - private SchemaSmoother schemaSmoother; + SchemaSmoother schemaSmoother; // Output - private VectorContainer outputContainer; + VectorContainer outputContainer; public ScanSchemaOrchestrator(BufferAllocator allocator) { this.allocator = allocator; @@ -493,20 +282,12 @@ public class ScanSchemaOrchestrator { ScanProjectionParser parser = metadataManager.projectionParser(); if (parser != null) { - - // For compatibility with Drill 1.12, insert the file metadata - // parser before others so that, in a wildcard query, metadata - // columns appear before others (such as the `columns` column.) - // This is temporary and should be removed once the test framework - // is restored to Drill 1.11 functionality. - parsers.add(parser); } // Parse the projection list. scanProj = new ScanLevelProjection(projection, parsers); - if (scanProj.hasWildcard() && useSchemaSmoothing) { schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers); } @@ -526,7 +307,7 @@ public class ScanSchemaOrchestrator { public ReaderSchemaOrchestrator startReader() { closeReader(); - currentReader = new ReaderSchemaOrchestrator(); + currentReader = new ReaderSchemaOrchestrator(this); return currentReader; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java index c7bae278e..a75611432 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java @@ -61,8 +61,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata; public class SchemaLevelProjection { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class); - /** * Schema-level projection is customizable. Implement this interface, and * add an instance to the scan orchestrator, to perform custom mappings @@ -81,10 +79,7 @@ public class SchemaLevelProjection { protected SchemaLevelProjection( List<SchemaProjectionResolver> resolvers) { - if (resolvers == null) { - resolvers = new ArrayList<>(); - } - this.resolvers = resolvers; + this.resolvers = resolvers == null ? new ArrayList<>() : resolvers; for (SchemaProjectionResolver resolver : resolvers) { resolver.startResolution(); } @@ -97,6 +92,8 @@ public class SchemaLevelProjection { return; } } - throw new IllegalStateException("No resolver for column: " + col.nodeType()); + throw new IllegalStateException( + String.format("No resolver for column `%s` of type %d", + col.name(), col.nodeType())); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java index a5a52c075..155fcf886 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java @@ -22,6 +22,56 @@ * or may be "missing" with null values applied. The code here prepares * a run-time projection plan based on the actual table schema. * <p> + * Looks at schema as a set of transforms. + * <ul> + * <li>Scan-level projection list from the query plan: The list of columns + * (or the wildcard) as requested by the user in the query. The planner + * determines which columns to project. In Drill, projection is speculative: + * it is a list of names which the planner hopes will appear in the data + * files. The reader must make up columns (the infamous nullable INT) when + * it turns out that no such column exists. Else, the reader must figure out + * the data type for any columns that does exist. + * <p> + * The scan project list defines the set of columns which the scan operator + * is obliged to send downstream. Ideally, the scan operator sends exactly the + * same schema (the project list with types filled in) for all batches. Since + * batches may come from different files, the scan operator is obligated to + * unify the schemas from those files (or blocks.)</ul> + * <li>Reader (file)-level projection occurs for each reader. A single scan + * may use multiple readers to read data. Each reader may offer more information + * about the schema. For example, a Parquet reader can obtain schema information + * from the Parquet headers. A JDBC reader obtains schema information from the + * returned schema. This is called "early schema." File-based readers can at least + * add implicit file or partition columns. + * <p> + * The result is a refined schema: the scan level schema with more information + * filled in. For Parquet, all projection information can be filled in. For + * CSV or JSON, we can only add file metadata information, but not yet the + * actual data schema.</ul> + * <li>Batch-level schema: once a reader reads actual data, it now knows + * exactly what it read. This is the "schema on read model." Thus, after reading + * a batch, any remaining uncertainty about the projected schema is removed. + * The actual data defined data types and so on. + * <p> + * Readers such as JSON and CSV are "late schema": they don't know the data + * schema until they read the file. This is true "schema on read." Further, for + * JSON, the data may change from one batch to the next as the reader "discovers" + * fields that did not appear in earlier batches. This requires some amount of + * "schema smoothing": the ability to preserve a consistent output schema even + * as the input schema jiggles around some.</ul> + * </ul> + * <p> + * The goal of this mechanism is to handle the above use cases cleanly, in a + * common set of classes, and to avoid the need for each reader to figure out + * all these issues for themselves (as was the case with earlier versions of + * Drill.) + * <p> + * Because these issues are complex, the code itself is complex. To make the + * code easier to manage, each bit of functionality is encapsulated in a + * distinct class. Classes combine via composition to create a "framework" + * suitable for each kind of reader: whether it be early or late schema, + * file-based or something else, etc. + * <p> * The core concept is one of successive refinement of the project * list through a set of rewrites: * <ul> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java index c1e383eb6..f464bae72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java @@ -37,7 +37,7 @@ public class ImpliedTupleRequest implements RequestedTuple { new ImpliedTupleRequest(false); public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>(); - private boolean allProjected; + private final boolean allProjected; public ImpliedTupleRequest(boolean allProjected) { this.allProjected = allProjected; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java index cd782c766..4643c57d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java @@ -73,7 +73,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace; public class RequestedTupleImpl implements RequestedTuple { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class); private final RequestedColumnImpl parent; private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>(); @@ -86,6 +86,13 @@ public class RequestedTupleImpl implements RequestedTuple { this.parent = parent; } + public RequestedTupleImpl(List<RequestedColumn> cols) { + parent = null; + for (RequestedColumn col : cols) { + projection.add(col.name(), col); + } + } + @Override public RequestedColumn get(String colName) { return projection.get(colName.toLowerCase()); @@ -119,10 +126,43 @@ public class RequestedTupleImpl implements RequestedTuple { } /** + * Create a requested tuple projection from a rewritten top-level + * projection list. The columns within the list have already been parsed to + * pick out arrays, maps and scalars. The list must not include the + * wildcard: a wildcard list must be passed in as a null list. An + * empty list means project nothing. Null list means project all, else + * project only the columns in the list. + * + * @param projList top-level, parsed columns + * @return the tuple projection for the top-leel row + */ + + public static RequestedTuple build(List<RequestedColumn> projList) { + if (projList == null) { + return new ImpliedTupleRequest(true); + } + if (projList.isEmpty()) { + return new ImpliedTupleRequest(false); + } + return new RequestedTupleImpl(projList); + } + + /** * Parse a projection list. The list should consist of a list of column names; - * any wildcards should have been processed by the caller. An empty list means + * or wildcards. An empty list means * nothing is projected. A null list means everything is projected (that is, a * null list here is equivalent to a wildcard in the SELECT statement.) + * <p> + * The projection list may include both a wildcard and column names (as in + * the case of implicit columns.) This results in a final list that both + * says that everything is projected, and provides the list of columns. + * <p> + * Parsing is used at two different times. First, to parse the list from + * the physical operator. This has the case above: an explicit wildcard + * and/or additional columns. Then, this class is used again to prepare the + * physical projection used when reading. In this case, wildcards should + * be removed, implicit columns pulled out, and just the list of read-level + * columns should remain. * * @param projList * the list of projected columns, or null if no projection is to be |