aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java85
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java230
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java241
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java44
22 files changed, 831 insertions, 323 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index bbf12d4b8..b366d34d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -238,8 +238,11 @@ class ReaderState {
/**
* Prepare the schema for this reader. Called for the first reader within a
- * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>. If
- * this is an early-schema reader, then the result set loader already has
+ * scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>.
+ * Asks the reader if it can provide a schema-only empty batch by calling
+ * the reader's <tt>defineSchema()</tt> method. If this is an early-schema
+ * reader, and it can provide a schema, then it should create an empty
+ * batch so that the the result set loader already has
* the proper value vectors set up. If this is a late-schema reader, we must
* read one batch to get the schema, then set aside the data for the next
* call to <tt>next()</tt>.
@@ -255,9 +258,10 @@ class ReaderState {
* <li>If if turned out that the file was
* empty when trying to read the schema, <tt>open()</tt> returned false
* and this method should never be called.</tt>
- * <li>Otherwise, if a schema was available, then the schema is already
- * set up in the result set loader as the result of schema negotiation, and
- * this method simply returns <tt>true</tt>.
+ * <li>Otherwise, the reader does not know if it is the first reader or
+ * not. The call to <tt>defineSchema()</tt> notifies the reader that it
+ * is the first one. The reader should set up in the result set loader
+ * with an empty batch.
* </ul>
* <p>
* Semantics for late-schema readers:
@@ -280,14 +284,12 @@ class ReaderState {
protected boolean buildSchema() {
- VectorContainer container = reader.output();
-
- if (container != null) {
+ if (reader.defineSchema()) {
// Bind the output container to the output of the scan operator.
// This returns an empty batch with the schema filled in.
- scanOp.containerAccessor.setContainer(container);
+ scanOp.containerAccessor.setContainer(reader.output());
schemaVersion = reader.schemaVersion();
return true;
}
@@ -297,7 +299,8 @@ class ReaderState {
if (! next()) {
return false;
}
- container = reader.output();
+ VectorContainer container = reader.output();
+ schemaVersion = reader.schemaVersion();
if (container.getRecordCount() == 0) {
return true;
}
@@ -374,8 +377,7 @@ class ReaderState {
private boolean readBatch() {
- // Try to read a batch. This may fail. If so, clean up the
- // mess.
+ // Try to read a batch. This may fail. If so, clean up the mess.
boolean more;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index c0985b538..61de584f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -84,8 +84,6 @@ import org.apache.drill.exec.record.VectorContainer;
public interface RowBatchReader {
- enum Result { OK, LAST_BATCH, EOF }
-
/**
* Name used when reporting errors. Can simply be the class name.
*
@@ -111,6 +109,22 @@ public interface RowBatchReader {
boolean open();
/**
+ * Called for the first reader within a scan. Allows the reader to
+ * provide an empty batch with only the schema filled in. Readers that
+ * are "early schema" (know the schema up front) should return true
+ * and create an empty batch. Readers that are "late schema" should
+ * return false. In that case, the scan operator will ask the reader
+ * to load an actual data batch, and infer the schema from that batch.
+ * <p>
+ * This step is optional and is purely for performance.
+ *
+ * @return true if this reader can (and has) defined an empty batch
+ * to describe the schema, false otherwise
+ */
+
+ boolean defineSchema();
+
+ /**
* Read the next batch. Reading continues until either EOF,
* or until the mutator indicates that the batch is full.
* The batch is considered valid if it is non-empty. Returning
@@ -129,7 +143,7 @@ public interface RowBatchReader {
* <tt>next()</tt> should be called again, <tt>false</tt> to indicate
* that EOF was reached
*
- * @throws RutimeException (<tt>UserException</tt> preferred) if an
+ * @throws RuntimeException (<tt>UserException</tt> preferred) if an
* error occurs that should fail the query.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
index 9e174146f..04b2c7eb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
@@ -20,15 +20,23 @@ package org.apache.drill.exec.physical.impl.scan;
import org.apache.drill.exec.ops.OperatorContext;
/**
- * Interface to the set of readers, and reader schema, that the
- * scan operator manages. The reader factory creates and returns
- * the readers to use for the scan, as determined by the specific
- * physical plan. The reader factory also
- * translates from the select list provided
- * in the physical plan to the actual columns returned from the
- * scan operator. The translation is reader-specific; this
- * interface allows the scan operator to trigger various
- * lifecycle events.
+ * Interface to the set of readers, and reader schema, that the scan operator
+ * manages. The reader factory creates and returns the readers to use for the
+ * scan, as determined by the specific physical plan. The reader factory also
+ * translates from the select list provided in the physical plan to the actual
+ * columns returned from the scan operator. The translation is reader-specific;
+ * this interface allows the scan operator to trigger various lifecycle events.
+ * <p>
+ * This interface decouples the scan implementation from the generic tasks
+ * needed to implement Drill's Volcano iterator protocol for operators, and
+ * Drill's schema and batch semantics. A scan implementation need only
+ * implement this interface to add plugin-specific scan behavior.
+ * <p>
+ * While this interface allows a wide variety of implementations, the intent is
+ * that most actual scanners will use the "managed" framework that handles the
+ * routine projection, vector management and other tasks that tend to be common
+ * across scanners. See {@link ScanSchemaOrchestrator} for the managed
+ * framework.
*/
public interface ScanOperatorEvents {
@@ -46,11 +54,25 @@ public interface ScanOperatorEvents {
void bind(OperatorContext context);
+ /**
+ * A scanner typically readers multiple data sources (such as files or
+ * file blocks.) A batch reader handles each read. This method returns
+ * the next reader in whatever sequence that this scan defines.
+ * <p>
+ * The preferred implementation is to create each batch reader in this
+ * call to minimize resource usage. Production queries may read
+ * thousands of files or blocks, so incremental reader creation can be
+ * far more efficient than creating readers at the start of the scan.
+ *
+ * @return a batch reader for one of the scan elements within the
+ * scan physical plan for this scan operator
+ */
+
RowBatchReader nextReader();
/**
* Called when the scan operator itself is closed. Indicates that no more
- * readers are available (or will be opened).
+ * readers are available.
*/
void close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index f6c72b120..966a03901 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -35,14 +35,24 @@ import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput;
* expands to `columns`.</li>
* <li>If the columns array appears, then no other table columns
* can appear.</li>
- * <li>If the columns array appears, then the wildcard cannot also
- * appear, unless that wildcard expanded to be `columns` as
- * described above.</li>
+ * <li>Both 'columns' and the wildcard can appear for queries such
+ * as:<code><pre>
+ * select * from dfs.`multilevel/csv`
+ * where columns[1] < 1000</pre>
+ * </code></li>
* <li>The query can select specific elements such as `columns`[2].
* In this case, only array elements can appear, not the unindexed
* `columns` column.</li>
+ * <li>If is possible for `columns` to appear twice. In this case,
+ * the project operator will make a copy.</li>
* </ul>
* <p>
+ * To handle these cases, the general rule is: allow any number
+ * of wildcard or `columns` appearances in the input projection, but
+ * collapse them all down to a single occurrence of `columns` in the
+ * output projection. (Upstream code will prevent `columns` from
+ * appearing twice in its non-indexed form.)
+ * <p>
* It falls to this parser to detect a not-uncommon user error, a
* query such as the following:<pre><code>
* SELECT max(columns[1]) AS col1
@@ -83,7 +93,8 @@ public class ColumnsArrayParser implements ScanProjectionParser {
@Override
public boolean parse(RequestedColumn inCol) {
if (requireColumnsArray && inCol.isWildcard()) {
- expandWildcard();
+ createColumnsCol(
+ new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
return true;
}
if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) {
@@ -113,41 +124,24 @@ public class ColumnsArrayParser implements ScanProjectionParser {
.build(logger);
}
}
-
- // Special `columns` array column.
-
- columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
- builder.addTableColumn(columnsArrayCol);
+ createColumnsCol(inCol);
return true;
}
- /**
- * Query contained SELECT *, and we know that the reader supports only
- * the `columns` array; go ahead and expand the wildcard to the only
- * possible column.
- */
+ private void createColumnsCol(RequestedColumn inCol) {
+
+ // Special `columns` array column. Allow multiple, but
+ // project only one.
- private void expandWildcard() {
if (columnsArrayCol != null) {
- throw UserException
- .validationError()
- .message("Cannot select columns[] and `*` together")
- .build(logger);
+ return;
}
- columnsArrayCol = new UnresolvedColumnsArrayColumn(
- new RequestedColumnImpl(builder.rootProjection(), ColumnsArrayManager.COLUMNS_COL));
+ columnsArrayCol = new UnresolvedColumnsArrayColumn(inCol);
builder.addTableColumn(columnsArrayCol);
}
@Override
- public void validate() {
- if (builder.hasWildcard() && columnsArrayCol != null) {
- throw UserException
- .validationError()
- .message("Cannot select `columns` and `*` together")
- .build(logger);
- }
- }
+ public void validate() { }
@Override
public void validateColumn(ColumnProjection col) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
index f9674dc2a..ae8502b52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataColumnsParser.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.scan.file;
+import java.util.HashSet;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,6 +40,7 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
private final FileMetadataManager metadataManager;
private final Pattern partitionPattern;
private ScanLevelProjection builder;
+ private final Set<Integer> referencedPartitions = new HashSet<>();
// Output
@@ -64,6 +67,11 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
if (defn != null) {
return buildMetadataColumn(defn, inCol);
}
+ if (inCol.isWildcard()) {
+ buildWildcard();
+
+ // Don't consider this a match.
+ }
return false;
}
@@ -80,11 +88,18 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
// Partition column
- builder.addMetadataColumn(
- new PartitionColumn(
- inCol.name(),
- Integer.parseInt(m.group(1))));
- hasImplicitCols = true;
+ int partitionIndex = Integer.parseInt(m.group(1));
+ if (! referencedPartitions.contains(partitionIndex)) {
+ builder.addMetadataColumn(
+ new PartitionColumn(
+ inCol.name(),
+ partitionIndex));
+
+ // Remember the partition for later wildcard expansion
+
+ referencedPartitions.add(partitionIndex);
+ hasImplicitCols = true;
+ }
return true;
}
@@ -107,8 +122,52 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {
return true;
}
+ private void buildWildcard() {
+ if (metadataManager.useLegacyWildcardExpansion &&
+ metadataManager.useLegacyExpansionLocation) {
+
+ // Star column: this is a SELECT * query.
+
+ // Old-style wildcard handling inserts all partition columns in
+ // the scanner, removes them in Project.
+ // Fill in the file metadata columns. Can do here because the
+ // set is constant across all files.
+
+ expandPartitions();
+ }
+ }
+
@Override
- public void validate() { }
+ public void validate() {
+
+ // Expand partitions if using a wildcard appears, if using the
+ // feature to expand partitions for wildcards, and we want the
+ // partitions after data columns.
+
+ if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion &&
+ ! metadataManager.useLegacyExpansionLocation) {
+ expandPartitions();
+ }
+ }
+
+ private void expandPartitions() {
+
+ // Legacy wildcard expansion: include the file partitions for this file.
+ // This is a disadvantage for a * query: files at different directory
+ // levels will have different numbers of columns. Would be better to
+ // return this data as an array at some point.
+ // Append this after the *, keeping the * for later expansion.
+
+ for (int i = 0; i < metadataManager.partitionCount(); i++) {
+ if (referencedPartitions.contains(i)) {
+ continue;
+ }
+ builder.addMetadataColumn(new PartitionColumn(
+ metadataManager.partitionName(i), i));
+ referencedPartitions.add(i);
+ }
+ hasImplicitCols = true;
+ }
@Override
public void validateColumn(ColumnProjection outCol) { }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
index fe4332a34..ba49a9f54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadataManager.java
@@ -53,6 +53,31 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
protected final String partitionDesignator;
protected List<FileMetadataColumnDefn> implicitColDefns = new ArrayList<>();
protected Map<String, FileMetadataColumnDefn> fileMetadataColIndex = CaseInsensitiveMap.newHashMap();
+
+ /**
+ * Indicates whether to expand partition columns when the query contains a wildcard.
+ * Supports queries such as the following:<code><pre>
+ * select * from dfs.`partitioned-dir`
+ * </pre><code>
+ * In which the output columns will be (columns, dir0) if the partitioned directory
+ * has one level of nesting.
+ *
+ * See {@link TestImplicitFileColumns#testImplicitColumns}
+ */
+ protected final boolean useLegacyWildcardExpansion;
+
+ /**
+ * In legacy mode, above, Drill expands partition columns whenever the
+ * wildcard appears. Drill 1.1 - 1.11 put expanded partition columns after
+ * data columns. This is actually a better position as it minimizes changes
+ * the row layout for files at different depths. Drill 1.12 moved them before
+ * data columns: at the location of the wildcard.
+ * <p>
+ * This flag, when set, uses the Drill 1.12 position. Later enhancements
+ * can unset this flag to go back to the future: use the preferred location
+ * after other columns.
+ */
+ protected final boolean useLegacyExpansionLocation;
private final FileMetadataColumnsParser parser;
// Internal state
@@ -84,7 +109,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
*/
public FileMetadataManager(OptionSet optionManager,
+ boolean useLegacyWildcardExpansion,
+ boolean useLegacyExpansionLocation,
Path rootDir, List<Path> files) {
+ this.useLegacyWildcardExpansion = useLegacyWildcardExpansion;
+ this.useLegacyExpansionLocation = useLegacyExpansionLocation;
scanRootDir = rootDir;
partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
@@ -117,6 +146,11 @@ public class FileMetadataManager implements MetadataManager, SchemaProjectionRes
}
}
+ public FileMetadataManager(OptionSet optionManager,
+ Path rootDir, List<Path> files) {
+ this(optionManager, false, false, rootDir, files);
+ }
+
private int computeMaxPartition(List<Path> files) {
int maxLen = 0;
for (Path filePath : files) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
index 609e9f05f..6ecf0cf05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java
@@ -32,19 +32,46 @@ import org.apache.hadoop.mapred.FileSplit;
/**
* The file scan framework adds into the scan framework support for implicit
- * file metadata columns.
+ * file metadata columns. The file scan framework brings together a number of
+ * components:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The set of files and/or blocks to read.</li>
+ * <li>The file system configuration to use for working with the files
+ * or blocks.</li>
+ * <li>The factory class to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>Options as defined by the base class.</li>
+ * </ul>
+ * <p>
+ * @See {AbstractScanFramework} for details.
*/
public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiator> {
- public interface FileReaderCreator {
+ /**
+ * Creates a batch reader on demand. Unlike earlier versions of Drill,
+ * this framework creates readers one by one, when they are needed.
+ * Doing so avoids excessive resource demands that come from creating
+ * potentially thousands of readers up front.
+ * <p>
+ * The reader itself is unique to each file type. This interface
+ * provides a common interface that this framework can use to create the
+ * file-specific reader on demand.
+ */
+
+ public interface FileReaderFactory {
ManagedReader<FileSchemaNegotiator> makeBatchReader(
DrillFileSystem dfs,
FileSplit split) throws ExecutionSetupException;
}
/**
- * Implementation of the file-level schema negotiator.
+ * Implementation of the file-level schema negotiator. At present, no
+ * file-specific features exist. This class shows, however, where we would
+ * add such features.
*/
public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl
@@ -55,12 +82,12 @@ public class FileScanFramework extends BaseFileScanFramework<FileSchemaNegotiato
}
}
- private final FileReaderCreator readerCreator;
+ private final FileReaderFactory readerCreator;
public FileScanFramework(List<SchemaPath> projection,
List<? extends FileWork> files,
Configuration fsConf,
- FileReaderCreator readerCreator) {
+ FileReaderFactory readerCreator) {
super(projection, files, fsConf);
this.readerCreator = readerCreator;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
index 4a15ff787..bf132651b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/MetadataColumn.java
@@ -44,4 +44,17 @@ public abstract class MetadataColumn extends ResolvedColumn implements ConstantC
public String name() { return schema.getName(); }
public abstract MetadataColumn resolve(FileMetadata fileInfo, VectorSource source, int sourceIndex);
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" schema=\"")
+ .append(schema.toString())
+ .append(", value=")
+ .append(value)
+ .append("]")
+ .toString();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
index 54079013a..d285261c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/AbstractScanFramework.java
@@ -40,15 +40,91 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
* config options, and implements the matching "managed reader". All details
* of setup, projection, and so on are handled by the framework and the components
* that the framework builds upon.
+ *
+ * <h4>Inputs</h4>
+ *
+ * At this basic level, a scan framework requires just a few simple inputs:
+ * <ul>
+ * <li>The projection list provided by the physical operator definition. This
+ * list identifies the set of "output" columns whih this framework is obliged
+ * to produce.</li>
+ * <li>The operator context which provides access to a memory allocator and
+ * other plumbing items.</li>
+ * <li>A method to create a reader for each of the files or blocks
+ * defined above. (Readers are created one-by-one as files are read.)</li>
+ * <li>The data type to use for projected columns which the reader cannot
+ * provide. (Drill allows such columns and fills in null values: traditionally
+ * nullable Int, but customizable here.)
+ * <li>Various other options.</li>
+ * </ul>
+ *
+ * <h4>Orchestration</h4>
+ *
+ * The above is sufficient to drive the entire scan operator functionality.
+ * Projection is done generically and is the same for all files. Only the
+ * reader (created via the factory class) differs from one type of file to
+ * another.
+ * <p>
+ * The framework achieves the work described below= by composing a large
+ * set of detailed classes, each of which performs some specific task. This
+ * structure leaves the reader to simply infer schema and read data.
+ * <p>
+ * In particular, rather than do all the orchestration here (which would tie
+ * that logic to the scan operation), the detailed work is delegated to the
+ * {@link ScanSchemaOrchestrator} class, with this class as a "shim" between
+ * the the Scan events API and the schema orchestrator implementation.
+ *
+ * <h4>Reader Integration</h4>
+ *
+ * The details of how a file is structured, how a schema is inferred, how
+ * data is decoded: all that is encapsulated in the reader. The only real
+ * Interaction between the reader and the framework is:
+ * <ul>
+ * <li>The reader "negotiates" a schema with the framework. The framework
+ * knows the projection list from the query plan, knows something about
+ * data types (whether a column should be scalar, a map or an array), and
+ * knows about the schema already defined by prior readers. The reader knows
+ * what schema it can produce (if "early schema.") The schema negotiator
+ * class handles this task.</li>
+ * <li>The reader reads data from the file and populates value vectors a
+ * batch at a time. The framework creates the result set loader to use for
+ * this work. The schema negotiator returns that loader to the reader, which
+ * uses it during read.
+ * <p>
+ * It is important to note that the result set loader also defines a schema:
+ * the schema requested by the reader. If the reader wants to read three
+ * columns, a, b, and c, then that is the schema that the result set loader
+ * supports. This is true even if the query plan only wants column a, or
+ * wants columns c, a. The framework handles the projection task so the
+ * reader does not have to worry about it. Reading an unwanted column
+ * is low cost: the result set loader will have provided a "dummy" column
+ * writer that simply discards the value. This is just as fast as having the
+ * reader use if-statements or a table to determine which columns to save.
+ * <p>
+ * A reader may be "late schema", true "schema on read." In this case, the
+ * reader simply tells the result set loader to create a new column reader
+ * on the fly. The framework will work out if that new column is to be
+ * projected and will return either a real column writer (projected column)
+ * or a dummy column writer (unprojected column.)</li>
+ * <li>The reader then reads batches of data until all data is read. The
+ * result set loader signals when a batch is full; the reader should not
+ * worry about this detail itself.</li>
+ * <li>The reader then releases its resources.</li>
+ * </ul>
*/
public abstract class AbstractScanFramework<T extends SchemaNegotiator> implements ScanOperatorEvents {
+ // Inputs
+
protected final List<SchemaPath> projection;
protected MajorType nullType;
protected int maxBatchRowCount;
protected int maxBatchByteCount;
protected OperatorContext context;
+
+ // Internal state
+
protected ScanSchemaOrchestrator scanOrchestrator;
public AbstractScanFramework(List<SchemaPath> projection) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index 631881265..dead9cb46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -65,9 +65,13 @@ public interface SchemaNegotiator {
* columns during the read.
*
* @param schema the table schema if known at open time
+ * @param isComplete true if the schema is complete: if it can be used
+ * to define an empty schema-only batch for the first reader. Set to
+ * false if the schema is partial: if the reader must read rows to
+ * determine the full schema
*/
- void setTableSchema(TupleMetadata schema);
+ void setTableSchema(TupleMetadata schema, boolean isComplete);
/**
* Set the preferred batch size (which may be overridden by the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 46f363dd5..0841049fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -53,6 +53,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
protected final AbstractScanFramework<?> basicFramework;
private final ShimBatchReader<? extends SchemaNegotiator> shim;
protected TupleMetadata tableSchema;
+ protected boolean isSchemaComplete;
protected int batchSize = ValueVector.MAX_ROW_COUNT;
public SchemaNegotiatorImpl(AbstractScanFramework<?> framework, ShimBatchReader<? extends SchemaNegotiator> shim) {
@@ -66,8 +67,9 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
}
@Override
- public void setTableSchema(TupleMetadata schema) {
+ public void setTableSchema(TupleMetadata schema, boolean isComplete) {
tableSchema = schema;
+ this.isSchemaComplete = schema != null && isComplete;
}
@Override
@@ -97,4 +99,6 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator {
public boolean isProjectionEmpty() {
return basicFramework.scanOrchestrator().isProjectNone();
}
+
+ public boolean isSchemaComplete() { return isSchemaComplete; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index 0dc3c5736..a97b32968 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.scan.framework;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
-import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.record.VectorContainer;
@@ -44,6 +44,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
protected final AbstractScanFramework<T> manager;
protected final ManagedReader<T> reader;
protected final ReaderSchemaOrchestrator readerOrchestrator;
+ protected SchemaNegotiatorImpl schemaNegotiator;
protected ResultSetLoader tableLoader;
/**
@@ -96,10 +97,19 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
}
@Override
+ public boolean defineSchema() {
+ if (schemaNegotiator.isSchemaComplete()) {
+ readerOrchestrator.defineSchema();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
public boolean next() {
// The reader may report EOF, but the result set loader might
- // have a lookhead row.
+ // have a lookahead row.
if (eof && ! tableLoader.hasRows()) {
return false;
@@ -181,6 +191,7 @@ public class ShimBatchReader<T extends SchemaNegotiator> implements RowBatchRead
}
public ResultSetLoader build(SchemaNegotiatorImpl schemaNegotiator) {
+ this.schemaNegotiator = schemaNegotiator;
readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
tableLoader = readerOrchestrator.makeTableLoader(schemaNegotiator.tableSchema);
return tableLoader;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
index efd881bb1..096b8447b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/package-info.java
@@ -29,7 +29,7 @@
* is an old version without a new column c, while file B includes the column.
* And so on.
* <p>
- * The scan operator here works to ensure schema continuity as much as
+ * The scan operator works to ensure schema continuity as much as
* possible, smoothing out "soft" schema changes that are simply artifacts of
* reading a collection of files. Only "hard" changes (true changes) are
* passed downstream.
@@ -157,5 +157,29 @@
* output batch in the order specified by the original SELECT list (or table order,
* if the original SELECT had a wildcard.) Fortunately, this is just involves
* moving around pointers to vectors; no actual data is moved during projection.
+ *
+ * <h4>Class Structure</h4>
+ *
+ * Some of the key classes here include:
+ * <ul>
+ * <li>{@link RowBatchReader} an extremely simple interface for reading data.
+ * We would like many developers to create new plugins and readers. The simplified
+ * interface pushes all complexity into the scan framework, leaving the reader to
+ * just read.</li>
+ * <li>{@link ShimBatchReader} an implementation of the above that converts from
+ * the simplified API to add additional structure to work with the result set loader.
+ * (The base interface is agnostic about how rows are read.)</li>
+ * <li>{@link ScheamNegotiator} and interface that allows a batch reader to
+ * "negotiate" a schema with the scan framework. The scan framework knows the
+ * columns that are to be projected. The reader knows what columns it can offer.
+ * The schema negotiator works out how to combine the two. It expresses the result
+ * as a result set loader. Column writers are defined for all columns that the
+ * reader wants to read, but only the materialized (projected) columns have actual
+ * vectors behind them. The non-projected columns are "free-wheeling" "dummy"
+ * writers.
+ * </li>
+ *
+ * And, yes, sorry for the terminology. File "readers" read from files, but
+ * use column "writers" to write to value vectors.
*/
package org.apache.drill.exec.physical.impl.scan.framework;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
index 3e302a1e8..d7de30ad6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/package-info.java
@@ -36,9 +36,90 @@
* <p>
* See {@link ScanOperatorExec} for details of the scan operator protocol
* and components.
+ *
+ * <h4>Traditional Class Structure<h4>
+ * The original design was simple: but required each reader to handle many
+ * detailed tasks.
+ * <pre><code>
+ * +------------+ +-----------+
+ * | Scan Batch | +---> | ScanBatch |
+ * | Creator | | +-----------+
+ * +------------+ | |
+ * | | |
+ * v | |
+ * +------------+ | v
+ * | Format | ---+ +---------------+
+ * | Plugin | -----> | Record Reader |
+ * +------------+ +---------------+
+ *
+ * </code></pre>
+ *
+ * The scan batch creator is unique to each storage plugin and is created
+ * based on the physical operator configuration ("pop config"). The
+ * scan batch creator delegates to the format plugin to create both the
+ * scan batch (the scan operator) and the set of readers which the scan
+ * batch will manage.
+ * <p>
+ * The scan batch
+ * provides a <code>Mutator</code> that creates the vectors used by the
+ * record readers. Schema continuity comes from reusing the Mutator from one
+ * file/block to the next.
+ * <p>
+ * One characteristic of this system is that all the record readers are
+ * created up front. If we must read 1000 blocks, we'll create 1000 record
+ * readers. Developers must be very careful to only allocate resources when
+ * the reader is opened, and release resources when the reader is closed.
+ * Else, resource bloat becomes a large problem.
+ *
+ * <h4>Revised Class Structure</h4>
+ *
+ * The new design is more complex because it divides tasks up into separate
+ * classes. The class structure is larger, but each class is smaller, more
+ * focused and does just one task.
+ * <pre><code>
+ * +------------+ +---------------+
+ * | Scan Batch | -------> | Format Plugin |
+ * | Creator | +---------------+
+ * +------------+ / | \
+ * / | \
+ * +---------------------+ | \ +---------------+
+ * | OperatorRecordBatch | | +---->| ScanFramework |
+ * +---------------------+ | | +---------------+
+ * v | |
+ * +------------------+ |
+ * | ScanOperatorExec | |
+ * +------------------+ v
+ * | +--------------+
+ * +----------> | Batch Reader |
+ * +--------------+
+ * </code></pre>
+ *
+ * Here, the scan batch creator again delegates to the format plugin. The
+ * format plugin creates three objects:
+ * <ul>
+ * <li>The <code>OperatorRecordBatch</code>, which encapsulates the Volcano
+ * iterator protocol. It also holds onto the output batch. This allows the
+ * operator implementation to just focus on its specific job.</li>
+ * <li>The <code>ScanOperatorExec</code> is the operator implementation for
+ * the new result-set-loader based scan.</li>
+ * <li>The scan framework is specific to each kind of reader. It handles
+ * everything which is unique to that reader. Rather than inheriting from
+ * the scan itself, the framework follows the strategy pattern: it says how
+ * to do a scan for the target format.<li>
+ * </ul>
+ *
+ * The overall structure uses the "composition" pattern: what is combined
+ * into a small set of classes in the traditional model is broken out into
+ * focused classes in the revised model.
+ * <p>
+ * A key part of the scan strategy is the batch reader. ("Batch" because
+ * it reads an entire batch at a time, using the result set loader.) The
+ * framework creates batch readers one by one as needed. Resource bloat
+ * is less of an issue because only one batch reader instance exists at
+ * any time for each scan operator instance.
* <p>
- * See the "managed" package for a reusable framework for handling the
- * details of batches, schema and so on.
+ * Each of the above is further broken down into additional classes to
+ * handle projection and so on.
*/
package org.apache.drill.exec.physical.impl.scan;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
index 41cc59582..c0bcfa3b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
*/
public class ExplicitSchemaProjection extends SchemaLevelProjection {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplicitSchemaProjection.class);
public ExplicitSchemaProjection(ScanLevelProjection scanProj,
TupleMetadata tableSchema,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
new file mode 100644
index 000000000..029b6a005
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Orchestrates projection tasks for a single reader within the set that the
+ * scan operator manages. Vectors are reused across readers, but via a vector
+ * cache. All other state is distinct between readers.
+ */
+
+public class ReaderSchemaOrchestrator implements VectorSource {
+
+ private final ScanSchemaOrchestrator scanOrchestrator;
+ private int readerBatchSize;
+ private ResultSetLoaderImpl tableLoader;
+ private int prevTableSchemaVersion = -1;
+
+ /**
+ * Assembles the table, metadata and null columns into the final output
+ * batch to be sent downstream. The key goal of this class is to "smooth"
+ * schema changes in this output batch by absorbing trivial schema changes
+ * that occur across readers.
+ */
+
+ private ResolvedRow rootTuple;
+ private VectorContainer tableContainer;
+
+ public ReaderSchemaOrchestrator(ScanSchemaOrchestrator scanSchemaOrchestrator) {
+ scanOrchestrator = scanSchemaOrchestrator;
+ readerBatchSize = scanOrchestrator.scanBatchRecordLimit;
+ }
+
+ public void setBatchSize(int size) {
+ if (size > 0) {
+ readerBatchSize = Math.min(size, scanOrchestrator.scanBatchRecordLimit);
+ }
+ }
+
+ public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
+ OptionBuilder options = new OptionBuilder();
+ options.setRowCountLimit(readerBatchSize);
+ options.setVectorCache(scanOrchestrator.vectorCache);
+ options.setBatchSizeLimit(scanOrchestrator.scanBatchByteLimit);
+
+ // Set up a selection list if available and is a subset of
+ // table columns. (Only needed for non-wildcard queries.)
+ // The projection list includes all candidate table columns
+ // whether or not they exist in the up-front schema. Handles
+ // the odd case where the reader claims a fixed schema, but
+ // adds a column later.
+
+ if (! scanOrchestrator.scanProj.projectAll()) {
+ options.setProjectionSet(scanOrchestrator.scanProj.readerProjection());
+ }
+ options.setSchema(tableSchema);
+
+ // Create the table loader
+
+ tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, options.build());
+ return tableLoader;
+ }
+
+ public boolean hasSchema() {
+ return prevTableSchemaVersion >= 0;
+ }
+
+ public void defineSchema() {
+ tableLoader.startEmptyBatch();
+ endBatch();
+ }
+
+ public void startBatch() {
+ tableLoader.startBatch();
+ }
+
+ /**
+ * Build the final output batch by projecting columns from the three input sources
+ * to the output batch. First, build the metadata and/or null columns for the
+ * table row count. Then, merge the sources.
+ */
+
+ public void endBatch() {
+
+ // Get the batch results in a container.
+
+ tableContainer = tableLoader.harvest();
+
+ // If the schema changed, set up the final projection based on
+ // the new (or first) schema.
+
+ if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
+ reviseOutputProjection();
+ } else {
+
+ // Fill in the null and metadata columns.
+
+ populateNonDataColumns();
+ }
+ rootTuple.setRowCount(tableContainer.getRecordCount());
+ }
+
+ private void populateNonDataColumns() {
+ int rowCount = tableContainer.getRecordCount();
+ scanOrchestrator.metadataManager.load(rowCount);
+ rootTuple.loadNulls(rowCount);
+ }
+
+ /**
+ * Create the list of null columns by comparing the SELECT list against the
+ * columns available in the batch schema. Create null columns for those that
+ * are missing. This is done for the first batch, and any time the schema
+ * changes. (For early-schema, the projection occurs once as the schema is set
+ * up-front and does not change.) For a SELECT *, the null column check
+ * only need be done if null columns were created when mapping from a prior
+ * schema.
+ */
+
+ private void reviseOutputProjection() {
+
+ // Do the table-schema level projection; the final matching
+ // of projected columns to available columns.
+
+ TupleMetadata tableSchema = tableLoader.harvestSchema();
+ if (scanOrchestrator.schemaSmoother != null) {
+ doSmoothedProjection(tableSchema);
+ } else if (scanOrchestrator.scanProj.hasWildcard()) {
+ doWildcardProjection(tableSchema);
+ } else {
+ doExplicitProjection(tableSchema);
+ }
+
+ // Combine metadata, nulls and batch data to form the final
+ // output container. Columns are created by the metadata and null
+ // loaders only in response to a batch, so create the first batch.
+
+ rootTuple.buildNulls(scanOrchestrator.vectorCache);
+ scanOrchestrator.metadataManager.define();
+ populateNonDataColumns();
+ rootTuple.project(tableContainer, scanOrchestrator.outputContainer);
+ prevTableSchemaVersion = tableLoader.schemaVersion();
+ }
+
+ private void doSmoothedProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+ scanOrchestrator.schemaSmoother.resolve(tableSchema, rootTuple);
+ }
+
+ /**
+ * Query contains a wildcard. The schema-level projection includes
+ * all columns provided by the reader.
+ */
+
+ private void doWildcardProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(null);
+ new WildcardSchemaProjection(scanOrchestrator.scanProj,
+ tableSchema, rootTuple, scanOrchestrator.schemaResolvers);
+ }
+
+ /**
+ * Explicit projection: include only those columns actually
+ * requested by the query, which may mean filling in null
+ * columns for projected columns that don't actually exist
+ * in the table.
+ *
+ * @param tableSchema newly arrived schema
+ */
+
+ private void doExplicitProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(scanOrchestrator.nullType, scanOrchestrator.allowRequiredNullColumns));
+ new ExplicitSchemaProjection(scanOrchestrator.scanProj,
+ tableSchema, rootTuple,
+ scanOrchestrator.schemaResolvers);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return tableContainer.getValueVector(index).getValueVector();
+ }
+
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (tableLoader != null) {
+ tableLoader.close();
+ tableLoader = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ if (rootTuple != null) {
+ rootTuple.close();
+ rootTuple = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ scanOrchestrator.metadataManager.endFile();
+ if (ex != null) {
+ throw ex;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
index 83d40a310..f90f722b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -100,8 +100,6 @@ import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
public class ScanLevelProjection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class);
-
/**
* Interface for add-on parsers, avoids the need to create
* a single, tightly-coupled parser for all types of columns.
@@ -128,12 +126,26 @@ public class ScanLevelProjection {
// Internal state
+ protected boolean includesWildcard;
protected boolean sawWildcard;
// Output
protected List<ColumnProjection> outputCols = new ArrayList<>();
+
+ /**
+ * Projection definition for the scan a whole. Parsed form of the input
+ * projection list.
+ */
+
protected RequestedTuple outputProjection;
+
+ /**
+ * Projection definition passed to each reader. This is the set of
+ * columns that the reader is asked to provide.
+ */
+
+ protected RequestedTuple readerProjection;
protected boolean hasWildcard;
protected boolean emptyProjection = true;
@@ -158,6 +170,18 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.bind(this);
}
+
+ // First pass: check if a wildcard exists.
+
+ for (RequestedColumn inCol : outputProjection.projections()) {
+ if (inCol.isWildcard()) {
+ includesWildcard = true;
+ break;
+ }
+ }
+
+ // Second pass: process remaining columns.
+
for (RequestedColumn inCol : outputProjection.projections()) {
if (inCol.isWildcard()) {
mapWildcard(inCol);
@@ -169,6 +193,23 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.build();
}
+
+ // Create the reader projection which includes either all columns
+ // (saw a wildcard) or just the unresolved columns (which excludes
+ // implicit columns.)
+
+ List<RequestedColumn> outputProj;
+ if (hasWildcard()) {
+ outputProj = null;
+ } else {
+ outputProj = new ArrayList<>();
+ for (ColumnProjection col : outputCols) {
+ if (col instanceof UnresolvedColumn) {
+ outputProj.add(((UnresolvedColumn) col).element());
+ }
+ }
+ }
+ readerProjection = RequestedTupleImpl.build(outputProj);
}
/**
@@ -181,6 +222,7 @@ public class ScanLevelProjection {
// Wildcard column: this is a SELECT * query.
+ assert includesWildcard;
if (sawWildcard) {
throw new IllegalArgumentException("Duplicate * entry in project list");
}
@@ -245,6 +287,15 @@ public class ScanLevelProjection {
}
}
+ // If the project list has a wildcard, and the column is not one recognized
+ // by the specialized parsers above, then just ignore it. It is likely a duplicate
+ // column name. In any event, it will be processed by the Project operator on
+ // top of this scan.
+
+ if (includesWildcard) {
+ return;
+ }
+
// This is a desired table column.
addTableColumn(
@@ -281,15 +332,6 @@ public class ScanLevelProjection {
for (ScanProjectionParser parser : parsers) {
parser.validateColumn(outCol);
}
- switch (outCol.nodeType()) {
- case UnresolvedColumn.UNRESOLVED:
- if (hasWildcard()) {
- throw new IllegalArgumentException("Cannot select table columns and * together");
- }
- break;
- default:
- break;
- }
}
}
@@ -333,6 +375,8 @@ public class ScanLevelProjection {
public RequestedTuple rootProjection() { return outputProjection; }
+ public RequestedTuple readerProjection() { return readerProjection; }
+
@Override
public String toString() {
return new StringBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index fe78f5a75..a5d6ca2a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -23,15 +23,10 @@ import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
-import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
-import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
-import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
/**
@@ -154,212 +149,6 @@ public class ScanSchemaOrchestrator {
public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
- /**
- * Orchestrates projection tasks for a single reader with the set that the
- * scan operator manages. Vectors are reused across readers, but via a vector
- * cache. All other state is distinct between readers.
- */
-
- public class ReaderSchemaOrchestrator implements VectorSource {
-
- private int readerBatchSize;
- private ResultSetLoaderImpl tableLoader;
- private int prevTableSchemaVersion = -1;
-
- /**
- * Assembles the table, metadata and null columns into the final output
- * batch to be sent downstream. The key goal of this class is to "smooth"
- * schema changes in this output batch by absorbing trivial schema changes
- * that occur across readers.
- */
-
- private ResolvedRow rootTuple;
- private VectorContainer tableContainer;
-
- public ReaderSchemaOrchestrator() {
- readerBatchSize = scanBatchRecordLimit;
- }
-
- public void setBatchSize(int size) {
- if (size > 0) {
- readerBatchSize = Math.min(size, scanBatchRecordLimit);
- }
- }
-
- public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
- OptionBuilder options = new OptionBuilder();
- options.setRowCountLimit(readerBatchSize);
- options.setVectorCache(vectorCache);
- options.setBatchSizeLimit(scanBatchByteLimit);
-
- // Set up a selection list if available and is a subset of
- // table columns. (Only needed for non-wildcard queries.)
- // The projection list includes all candidate table columns
- // whether or not they exist in the up-front schema. Handles
- // the odd case where the reader claims a fixed schema, but
- // adds a column later.
-
- if (! scanProj.projectAll()) {
- options.setProjectionSet(scanProj.rootProjection());
- }
- options.setSchema(tableSchema);
-
- // Create the table loader
-
- tableLoader = new ResultSetLoaderImpl(allocator, options.build());
-
- // If a schema is given, create a zero-row batch to announce the
- // schema downstream in the form of an empty batch.
-
- if (tableSchema != null) {
- tableLoader.startEmptyBatch();
- endBatch();
- }
-
- return tableLoader;
- }
-
- public boolean hasSchema() {
- return prevTableSchemaVersion >= 0;
- }
-
- public void startBatch() {
- tableLoader.startBatch();
- }
-
- /**
- * Build the final output batch by projecting columns from the three input sources
- * to the output batch. First, build the metadata and/or null columns for the
- * table row count. Then, merge the sources.
- */
-
- public void endBatch() {
-
- // Get the batch results in a container.
-
- tableContainer = tableLoader.harvest();
-
- // If the schema changed, set up the final projection based on
- // the new (or first) schema.
-
- if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
- reviseOutputProjection();
- } else {
-
- // Fill in the null and metadata columns.
-
- populateNonDataColumns();
- }
- rootTuple.setRowCount(tableContainer.getRecordCount());
- }
-
- private void populateNonDataColumns() {
- int rowCount = tableContainer.getRecordCount();
- metadataManager.load(rowCount);
- rootTuple.loadNulls(rowCount);
- }
-
- /**
- * Create the list of null columns by comparing the SELECT list against the
- * columns available in the batch schema. Create null columns for those that
- * are missing. This is done for the first batch, and any time the schema
- * changes. (For early-schema, the projection occurs once as the schema is set
- * up-front and does not change.) For a SELECT *, the null column check
- * only need be done if null columns were created when mapping from a prior
- * schema.
- */
-
- private void reviseOutputProjection() {
-
- // Do the table-schema level projection; the final matching
- // of projected columns to available columns.
-
- TupleMetadata tableSchema = tableLoader.harvestSchema();
- if (schemaSmoother != null) {
- doSmoothedProjection(tableSchema);
- } else if (scanProj.hasWildcard()) {
- doWildcardProjection(tableSchema);
- } else {
- doExplicitProjection(tableSchema);
- }
-
- // Combine metadata, nulls and batch data to form the final
- // output container. Columns are created by the metadata and null
- // loaders only in response to a batch, so create the first batch.
-
- rootTuple.buildNulls(vectorCache);
- metadataManager.define();
- populateNonDataColumns();
- rootTuple.project(tableContainer, outputContainer);
- prevTableSchemaVersion = tableLoader.schemaVersion();
- }
-
- private void doSmoothedProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(
- new NullColumnBuilder(nullType, allowRequiredNullColumns));
- schemaSmoother.resolve(tableSchema, rootTuple);
- }
-
- /**
- * Query contains a wildcard. The schema-level projection includes
- * all columns provided by the reader.
- */
-
- private void doWildcardProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(null);
- new WildcardSchemaProjection(scanProj,
- tableSchema, rootTuple, schemaResolvers);
- }
-
- /**
- * Explicit projection: include only those columns actually
- * requested by the query, which may mean filling in null
- * columns for projected columns that don't actually exist
- * in the table.
- *
- * @param tableSchema newly arrived schema
- */
-
- private void doExplicitProjection(TupleMetadata tableSchema) {
- rootTuple = new ResolvedRow(
- new NullColumnBuilder(nullType, allowRequiredNullColumns));
- new ExplicitSchemaProjection(scanProj,
- tableSchema, rootTuple,
- schemaResolvers);
- }
-
- @Override
- public ValueVector vector(int index) {
- return tableContainer.getValueVector(index).getValueVector();
- }
-
- public void close() {
- RuntimeException ex = null;
- try {
- if (tableLoader != null) {
- tableLoader.close();
- tableLoader = null;
- }
- }
- catch (RuntimeException e) {
- ex = e;
- }
- try {
- if (rootTuple != null) {
- rootTuple.close();
- rootTuple = null;
- }
- }
- catch (RuntimeException e) {
- ex = ex == null ? e : ex;
- }
- metadataManager.endFile();
- if (ex != null) {
- throw ex;
- }
- }
- }
-
// Configuration
/**
@@ -367,16 +156,16 @@ public class ScanSchemaOrchestrator {
* not set, the null type is the Drill default.
*/
- private MajorType nullType;
+ MajorType nullType;
/**
* Creates the metadata (file and directory) columns, if needed.
*/
- private MetadataManager metadataManager;
- private final BufferAllocator allocator;
- private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
- private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
+ MetadataManager metadataManager;
+ final BufferAllocator allocator;
+ int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
+ int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
private final List<ScanProjectionParser> parsers = new ArrayList<>();
/**
@@ -389,7 +178,7 @@ public class ScanSchemaOrchestrator {
List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>();
private boolean useSchemaSmoothing;
- private boolean allowRequiredNullColumns;
+ boolean allowRequiredNullColumns;
// Internal state
@@ -402,14 +191,14 @@ public class ScanSchemaOrchestrator {
* vectors rather than vector instances, this cache can be deprecated.
*/
- private ResultVectorCacheImpl vectorCache;
- private ScanLevelProjection scanProj;
+ ResultVectorCacheImpl vectorCache;
+ ScanLevelProjection scanProj;
private ReaderSchemaOrchestrator currentReader;
- private SchemaSmoother schemaSmoother;
+ SchemaSmoother schemaSmoother;
// Output
- private VectorContainer outputContainer;
+ VectorContainer outputContainer;
public ScanSchemaOrchestrator(BufferAllocator allocator) {
this.allocator = allocator;
@@ -493,20 +282,12 @@ public class ScanSchemaOrchestrator {
ScanProjectionParser parser = metadataManager.projectionParser();
if (parser != null) {
-
- // For compatibility with Drill 1.12, insert the file metadata
- // parser before others so that, in a wildcard query, metadata
- // columns appear before others (such as the `columns` column.)
- // This is temporary and should be removed once the test framework
- // is restored to Drill 1.11 functionality.
-
parsers.add(parser);
}
// Parse the projection list.
scanProj = new ScanLevelProjection(projection, parsers);
-
if (scanProj.hasWildcard() && useSchemaSmoothing) {
schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers);
}
@@ -526,7 +307,7 @@ public class ScanSchemaOrchestrator {
public ReaderSchemaOrchestrator startReader() {
closeReader();
- currentReader = new ReaderSchemaOrchestrator();
+ currentReader = new ReaderSchemaOrchestrator(this);
return currentReader;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
index c7bae278e..a75611432 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
@@ -61,8 +61,6 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
public class SchemaLevelProjection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class);
-
/**
* Schema-level projection is customizable. Implement this interface, and
* add an instance to the scan orchestrator, to perform custom mappings
@@ -81,10 +79,7 @@ public class SchemaLevelProjection {
protected SchemaLevelProjection(
List<SchemaProjectionResolver> resolvers) {
- if (resolvers == null) {
- resolvers = new ArrayList<>();
- }
- this.resolvers = resolvers;
+ this.resolvers = resolvers == null ? new ArrayList<>() : resolvers;
for (SchemaProjectionResolver resolver : resolvers) {
resolver.startResolution();
}
@@ -97,6 +92,8 @@ public class SchemaLevelProjection {
return;
}
}
- throw new IllegalStateException("No resolver for column: " + col.nodeType());
+ throw new IllegalStateException(
+ String.format("No resolver for column `%s` of type %d",
+ col.name(), col.nodeType()));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
index a5a52c075..155fcf886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -22,6 +22,56 @@
* or may be "missing" with null values applied. The code here prepares
* a run-time projection plan based on the actual table schema.
* <p>
+ * Looks at schema as a set of transforms.
+ * <ul>
+ * <li>Scan-level projection list from the query plan: The list of columns
+ * (or the wildcard) as requested by the user in the query. The planner
+ * determines which columns to project. In Drill, projection is speculative:
+ * it is a list of names which the planner hopes will appear in the data
+ * files. The reader must make up columns (the infamous nullable INT) when
+ * it turns out that no such column exists. Else, the reader must figure out
+ * the data type for any columns that does exist.
+ * <p>
+ * The scan project list defines the set of columns which the scan operator
+ * is obliged to send downstream. Ideally, the scan operator sends exactly the
+ * same schema (the project list with types filled in) for all batches. Since
+ * batches may come from different files, the scan operator is obligated to
+ * unify the schemas from those files (or blocks.)</ul>
+ * <li>Reader (file)-level projection occurs for each reader. A single scan
+ * may use multiple readers to read data. Each reader may offer more information
+ * about the schema. For example, a Parquet reader can obtain schema information
+ * from the Parquet headers. A JDBC reader obtains schema information from the
+ * returned schema. This is called "early schema." File-based readers can at least
+ * add implicit file or partition columns.
+ * <p>
+ * The result is a refined schema: the scan level schema with more information
+ * filled in. For Parquet, all projection information can be filled in. For
+ * CSV or JSON, we can only add file metadata information, but not yet the
+ * actual data schema.</ul>
+ * <li>Batch-level schema: once a reader reads actual data, it now knows
+ * exactly what it read. This is the "schema on read model." Thus, after reading
+ * a batch, any remaining uncertainty about the projected schema is removed.
+ * The actual data defined data types and so on.
+ * <p>
+ * Readers such as JSON and CSV are "late schema": they don't know the data
+ * schema until they read the file. This is true "schema on read." Further, for
+ * JSON, the data may change from one batch to the next as the reader "discovers"
+ * fields that did not appear in earlier batches. This requires some amount of
+ * "schema smoothing": the ability to preserve a consistent output schema even
+ * as the input schema jiggles around some.</ul>
+ * </ul>
+ * <p>
+ * The goal of this mechanism is to handle the above use cases cleanly, in a
+ * common set of classes, and to avoid the need for each reader to figure out
+ * all these issues for themselves (as was the case with earlier versions of
+ * Drill.)
+ * <p>
+ * Because these issues are complex, the code itself is complex. To make the
+ * code easier to manage, each bit of functionality is encapsulated in a
+ * distinct class. Classes combine via composition to create a "framework"
+ * suitable for each kind of reader: whether it be early or late schema,
+ * file-based or something else, etc.
+ * <p>
* The core concept is one of successive refinement of the project
* list through a set of rewrites:
* <ul>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
index c1e383eb6..f464bae72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/ImpliedTupleRequest.java
@@ -37,7 +37,7 @@ public class ImpliedTupleRequest implements RequestedTuple {
new ImpliedTupleRequest(false);
public static final List<RequestedColumn> EMPTY_COLS = new ArrayList<>();
- private boolean allProjected;
+ private final boolean allProjected;
public ImpliedTupleRequest(boolean allProjected) {
this.allProjected = allProjected;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index cd782c766..4643c57d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -73,7 +73,7 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
public class RequestedTupleImpl implements RequestedTuple {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestedTupleImpl.class);
private final RequestedColumnImpl parent;
private final TupleNameSpace<RequestedColumn> projection = new TupleNameSpace<>();
@@ -86,6 +86,13 @@ public class RequestedTupleImpl implements RequestedTuple {
this.parent = parent;
}
+ public RequestedTupleImpl(List<RequestedColumn> cols) {
+ parent = null;
+ for (RequestedColumn col : cols) {
+ projection.add(col.name(), col);
+ }
+ }
+
@Override
public RequestedColumn get(String colName) {
return projection.get(colName.toLowerCase());
@@ -119,10 +126,43 @@ public class RequestedTupleImpl implements RequestedTuple {
}
/**
+ * Create a requested tuple projection from a rewritten top-level
+ * projection list. The columns within the list have already been parsed to
+ * pick out arrays, maps and scalars. The list must not include the
+ * wildcard: a wildcard list must be passed in as a null list. An
+ * empty list means project nothing. Null list means project all, else
+ * project only the columns in the list.
+ *
+ * @param projList top-level, parsed columns
+ * @return the tuple projection for the top-leel row
+ */
+
+ public static RequestedTuple build(List<RequestedColumn> projList) {
+ if (projList == null) {
+ return new ImpliedTupleRequest(true);
+ }
+ if (projList.isEmpty()) {
+ return new ImpliedTupleRequest(false);
+ }
+ return new RequestedTupleImpl(projList);
+ }
+
+ /**
* Parse a projection list. The list should consist of a list of column names;
- * any wildcards should have been processed by the caller. An empty list means
+ * or wildcards. An empty list means
* nothing is projected. A null list means everything is projected (that is, a
* null list here is equivalent to a wildcard in the SELECT statement.)
+ * <p>
+ * The projection list may include both a wildcard and column names (as in
+ * the case of implicit columns.) This results in a final list that both
+ * says that everything is projected, and provides the list of columns.
+ * <p>
+ * Parsing is used at two different times. First, to parse the list from
+ * the physical operator. This has the case above: an explicit wildcard
+ * and/or additional columns. Then, this class is used again to prepare the
+ * physical projection used when reading. In this case, wildcards should
+ * be removed, implicit columns pulled out, and just the list of read-level
+ * columns should remain.
*
* @param projList
* the list of projected columns, or null if no projection is to be